From 9e27e460da090cbe028b620521e030432522ff60 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Wed, 24 Apr 2024 14:50:39 -0700 Subject: [PATCH] [Tiered Caching] Bug fix for IndicesRequestCache StaleKey management (#13070) * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash * Update CHANGELOG.md Signed-off-by: Kiran Prakash * revert Signed-off-by: Kiran Prakash * revert Signed-off-by: Kiran Prakash * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash * code comments only Signed-off-by: Kiran Prakash * docs changes Signed-off-by: Kiran Prakash * Update CHANGELOG.md Signed-off-by: Kiran Prakash * revert catching AlreadyClosedException Signed-off-by: Kiran Prakash * assert Signed-off-by: Kiran Prakash * conflicts Signed-off-by: Kiran Prakash * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash * address comments Signed-off-by: Kiran Prakash * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash * address conflicts Signed-off-by: Kiran Prakash * spotless apply Signed-off-by: Kiran Prakash * address comments Signed-off-by: Kiran Prakash * update code comments Signed-off-by: Kiran Prakash * address bug & add tests Signed-off-by: Kiran Prakash --------- Signed-off-by: Kiran Prakash (cherry picked from commit db361ecead1d3c945371c84f4e0ea915c7abcc57) --- CHANGELOG.md | 1 + .../indices/IndicesRequestCache.java | 98 +- .../indices/IndicesRequestCacheTests.java | 973 +++++++++++------- 3 files changed, 675 insertions(+), 397 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8b32072952500..279fa1bfaacd6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -62,6 +62,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix issue with feature flags where default value may not be honored ([#12849](https://github.com/opensearch-project/OpenSearch/pull/12849)) - Enabled mockTelemetryPlugin for IT and fixed OOM issues ([#13054](https://github.com/opensearch-project/OpenSearch/pull/13054)) - Fix implement mark() and markSupported() in class FilterStreamInput ([#13098](https://github.com/opensearch-project/OpenSearch/pull/13098)) +- Fix IndicesRequestCache Stale calculation ([#13070](https://github.com/opensearch-project/OpenSearch/pull/13070)] - Fix snapshot _status API to return correct status for partial snapshots ([#12812](https://github.com/opensearch-project/OpenSearch/pull/12812)) - Improve the error messages for _stats with closed indices ([#13012](https://github.com/opensearch-project/OpenSearch/pull/13012)) - Ignore BaseRestHandler unconsumed content check as it's always consumed. ([#13290](https://github.com/opensearch-project/OpenSearch/pull/13290)) diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 607ff721bd357..a6a3bbbe41e5e 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -46,6 +46,7 @@ import org.opensearch.common.cache.LoadAwareCacheLoader; import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.RemovalReason; import org.opensearch.common.cache.policy.CachedQueryResult; import org.opensearch.common.cache.serializer.BytesReferenceSerializer; import org.opensearch.common.cache.service.CacheService; @@ -208,11 +209,24 @@ void clear(CacheEntity entity) { public void onRemoval(RemovalNotification notification) { // In case this event happens for an old shard, we can safely ignore this as we don't keep track for old // shards as part of request cache. +<<<<<<< HEAD Key key = notification.getKey(); cacheEntityLookup.apply(key.shardId).ifPresent(entity -> entity.onRemoval(notification)); cacheCleanupManager.updateCleanupKeyToCountMapOnCacheEviction( new CleanupKey(cacheEntityLookup.apply(key.shardId).orElse(null), key.readerCacheKeyId) ); +======= + // Pass a new removal notification containing Key rather than ICacheKey to the CacheEntity for backwards compatibility. + Key key = notification.getKey().key; + RemovalNotification newNotification = new RemovalNotification<>( + key, + notification.getValue(), + notification.getRemovalReason() + ); + cacheEntityLookup.apply(key.shardId).ifPresent(entity -> entity.onRemoval(newNotification)); + CleanupKey cleanupKey = new CleanupKey(cacheEntityLookup.apply(key.shardId).orElse(null), key.readerCacheKeyId); + cacheCleanupManager.updateStaleCountOnEntryRemoval(cleanupKey, newNotification); +>>>>>>> db361ecead1 ([Tiered Caching] Bug fix for IndicesRequestCache StaleKey management (#13070)) } BytesReference getOrCompute( @@ -241,10 +255,11 @@ BytesReference getOrCompute( OpenSearchDirectoryReader.addReaderCloseListener(reader, cleanupKey); } } - cacheCleanupManager.updateCleanupKeyToCountMapOnCacheInsertion(cleanupKey); + cacheCleanupManager.updateStaleCountOnCacheInsert(cleanupKey); } else { cacheEntity.onHit(); } + return value; } @@ -477,7 +492,7 @@ void enqueueCleanupKey(CleanupKey cleanupKey) { * * @param cleanupKey the CleanupKey to be updated in the map */ - private void updateCleanupKeyToCountMapOnCacheInsertion(CleanupKey cleanupKey) { + private void updateStaleCountOnCacheInsert(CleanupKey cleanupKey) { if (stalenessThreshold == 0.0 || cleanupKey.entity == null) { return; } @@ -493,8 +508,29 @@ private void updateCleanupKeyToCountMapOnCacheInsertion(CleanupKey cleanupKey) { cleanupKeyToCountMap.computeIfAbsent(shardId, k -> new HashMap<>()).merge(cleanupKey.readerCacheKeyId, 1, Integer::sum); } - private void updateCleanupKeyToCountMapOnCacheEviction(CleanupKey cleanupKey) { - if (stalenessThreshold == 0.0 || cleanupKey.entity == null) { + /** + * Handles the eviction of a cache entry. + * + *

This method is called when an entry is evicted from the cache. + * We consider all removal notifications except with the reason Replaced + * {@link #incrementStaleKeysCount} would have removed the entries from the map and increment the {@link #staleKeysCount} + * Hence we decrement {@link #staleKeysCount} if we do not find the shardId or readerCacheKeyId in the map. + * Skip decrementing staleKeysCount if we find the shardId or readerCacheKeyId in the map since it would have not been accounted for in the staleKeysCount in + * + * @param cleanupKey the CleanupKey that has been evicted from the cache + * @param notification RemovalNotification of the cache entry evicted + */ + private void updateStaleCountOnEntryRemoval(CleanupKey cleanupKey, RemovalNotification notification) { + if (notification.getRemovalReason() == RemovalReason.REPLACED) { + // The reason of the notification is REPLACED when a cache entry's value is updated, since replacing an entry + // does not affect the staleness count, we skip such notifications. + return; + } + if (cleanupKey.entity == null) { + // entity will only be null when the shard is closed/deleted + // we would have accounted this in staleKeysCount when the closing/deletion of shard would have closed the associated + // readers + staleKeysCount.decrementAndGet(); return; } IndexShard indexShard = (IndexShard) cleanupKey.entity.getCacheIdentity(); @@ -504,15 +540,33 @@ private void updateCleanupKeyToCountMapOnCacheEviction(CleanupKey cleanupKey) { } ShardId shardId = indexShard.shardId(); - cleanupKeyToCountMap.computeIfPresent(shardId, (shard, keyCountMap) -> { - keyCountMap.computeIfPresent(cleanupKey.readerCacheKeyId, (key, currentValue) -> { - // decrement the stale key count + cleanupKeyToCountMap.compute(shardId, (key, readerCacheKeyMap) -> { + if (readerCacheKeyMap == null || !readerCacheKeyMap.containsKey(cleanupKey.readerCacheKeyId)) { + // If ShardId is not present or readerCacheKeyId is not present + // it should have already been accounted for and hence been removed from this map + // so decrement staleKeysCount staleKeysCount.decrementAndGet(); - int newValue = currentValue - 1; - // Remove the key if the new value is zero by returning null; otherwise, update with the new value. - return newValue == 0 ? null : newValue; - }); - return keyCountMap; + // Return the current map + return readerCacheKeyMap; + } else { + // If it is in the map, it is not stale yet. + // Proceed to adjust the count for the readerCacheKeyId in the map + // but do not decrement the staleKeysCount + Integer count = readerCacheKeyMap.get(cleanupKey.readerCacheKeyId); + // this should never be null + assert (count != null && count >= 0); + // Reduce the count by 1 + int newCount = count - 1; + if (newCount > 0) { + // Update the map with the new count + readerCacheKeyMap.put(cleanupKey.readerCacheKeyId, newCount); + } else { + // Remove the readerCacheKeyId entry if new count is zero + readerCacheKeyMap.remove(cleanupKey.readerCacheKeyId); + } + // If after modification, the readerCacheKeyMap is empty, we return null to remove the ShardId entry + return readerCacheKeyMap.isEmpty() ? null : readerCacheKeyMap; + } }); } @@ -520,7 +574,7 @@ private void updateCleanupKeyToCountMapOnCacheEviction(CleanupKey cleanupKey) { * Updates the count of stale keys in the cache. * This method is called when a CleanupKey is added to the keysToClean set. * - * It increments the staleKeysCount by the count of the CleanupKey in the cleanupKeyToCountMap. + *

It increments the staleKeysCount by the count of the CleanupKey in the cleanupKeyToCountMap. * If the CleanupKey's readerCacheKeyId is null or the CleanupKey's entity is not open, it increments the staleKeysCount * by the total count of keys associated with the CleanupKey's ShardId in the cleanupKeyToCountMap and removes the ShardId from the map. * @@ -538,7 +592,7 @@ private void incrementStaleKeysCount(CleanupKey cleanupKey) { ShardId shardId = indexShard.shardId(); // Using computeIfPresent to atomically operate on the countMap for a given shardId - cleanupKeyToCountMap.computeIfPresent(shardId, (key, countMap) -> { + cleanupKeyToCountMap.computeIfPresent(shardId, (currentShardId, countMap) -> { if (cleanupKey.readerCacheKeyId == null) { // Aggregate and add to staleKeysCount atomically if readerCacheKeyId is null int totalSum = countMap.values().stream().mapToInt(Integer::intValue).sum(); @@ -547,18 +601,19 @@ private void incrementStaleKeysCount(CleanupKey cleanupKey) { return null; } else { // Update staleKeysCount based on specific readerCacheKeyId, then remove it from the countMap - countMap.computeIfPresent(cleanupKey.readerCacheKeyId, (k, v) -> { - staleKeysCount.addAndGet(v); + countMap.computeIfPresent(cleanupKey.readerCacheKeyId, (readerCacheKey, count) -> { + staleKeysCount.addAndGet(count); // Return null to remove the key after updating staleKeysCount return null; }); - // Check if countMap is empty after removal to decide if we need to remove the shardId entry if (countMap.isEmpty()) { - return null; // Returning null removes the entry for shardId + // Returning null removes the entry for shardId + return null; } } - return countMap; // Return the modified countMap to keep the mapping + // Return the modified countMap to retain updates + return countMap; }); } @@ -673,6 +728,11 @@ public void close() { this.cacheCleaner.close(); } + // for testing + ConcurrentMap> getCleanupKeyToCountMap() { + return cleanupKeyToCountMap; + } + private final class IndicesRequestCacheCleaner implements Runnable, Releasable { private final IndicesRequestCacheCleanupManager cacheCleanupManager; diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index 68da79a7fda84..7fae705c0d2cc 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -38,7 +38,6 @@ import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.Term; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.TermQuery; @@ -49,7 +48,11 @@ import org.opensearch.common.cache.RemovalNotification; import org.opensearch.common.cache.RemovalReason; import org.opensearch.common.cache.module.CacheModule; +<<<<<<< HEAD import org.opensearch.common.cache.service.CacheService; +======= +import org.opensearch.common.cache.stats.ImmutableCacheStats; +>>>>>>> db361ecead1 ([Tiered Caching] Bug fix for IndicesRequestCache StaleKey management (#13070)) import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import org.opensearch.common.settings.Settings; @@ -73,45 +76,61 @@ import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchSingleNodeTestCase; import org.opensearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +<<<<<<< HEAD +======= +import java.util.HashMap; +import java.util.List; +>>>>>>> db361ecead1 ([Tiered Caching] Bug fix for IndicesRequestCache StaleKey management (#13070)) import java.util.Optional; import java.util.UUID; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; import static org.opensearch.indices.IndicesRequestCache.INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class IndicesRequestCacheTests extends OpenSearchSingleNodeTestCase { + private ThreadPool threadPool; + private IndexWriter writer; + private Directory dir; + private IndicesRequestCache cache; + private IndexShard indexShard; + private ThreadPool getThreadPool() { return new ThreadPool(Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "default tracer tests").build()); } - public void testBasicOperationsCache() throws Exception { - IndexShard indexShard = createIndex("test").getShard(0); - ThreadPool threadPool = getThreadPool(); - IndicesRequestCache cache = new IndicesRequestCache( - Settings.EMPTY, - (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))), - new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(), - threadPool, - ClusterServiceUtils.createClusterService(threadPool) - ); - Directory dir = newDirectory(); - IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); + @Before + public void setup() throws IOException { + dir = newDirectory(); + writer = new IndexWriter(dir, newIndexWriterConfig()); + indexShard = createIndex("test").getShard(0); + } + + @After + public void cleanup() throws IOException { + IOUtils.close(writer, dir, cache); + terminate(threadPool); + } + public void testBasicOperationsCache() throws Exception { + threadPool = getThreadPool(); + cache = getIndicesRequestCache(Settings.EMPTY); writer.addDocument(newDoc(0, "foo")); - DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); - BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); + DirectoryReader reader = getReader(writer, indexShard.shardId()); // initial cache IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); Loader loader = new Loader(reader, 0); - BytesReference value = cache.getOrCompute(entity, loader, reader, termBytes); + BytesReference value = cache.getOrCompute(entity, loader, reader, getTermBytes()); assertEquals("foo", value.streamInput().readString()); ShardRequestCache requestCacheStats = indexShard.requestCache(); assertEquals(0, requestCacheStats.stats().getHitCount()); @@ -123,7 +142,7 @@ public void testBasicOperationsCache() throws Exception { // cache hit entity = new IndicesService.IndexShardCacheEntity(indexShard); loader = new Loader(reader, 0); - value = cache.getOrCompute(entity, loader, reader, termBytes); + value = cache.getOrCompute(entity, loader, reader, getTermBytes()); assertEquals("foo", value.streamInput().readString()); requestCacheStats = indexShard.requestCache(); assertEquals(1, requestCacheStats.stats().getHitCount()); @@ -149,34 +168,21 @@ public void testBasicOperationsCache() throws Exception { assertEquals(0, cache.count()); assertEquals(0, requestCacheStats.stats().getMemorySize().bytesAsInt()); - IOUtils.close(reader, writer, dir, cache); - terminate(threadPool); + IOUtils.close(reader); assertEquals(0, cache.numRegisteredCloseListeners()); } public void testBasicOperationsCacheWithFeatureFlag() throws Exception { - IndexShard indexShard = createIndex("test").getShard(0); - CacheService cacheService = new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(); - ThreadPool threadPool = getThreadPool(); - IndicesRequestCache cache = new IndicesRequestCache( - Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.PLUGGABLE_CACHE, "true").build(), - (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))), - cacheService, - threadPool, - ClusterServiceUtils.createClusterService(threadPool) - ); - Directory dir = newDirectory(); - IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); - + threadPool = getThreadPool(); + Settings settings = Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.PLUGGABLE_CACHE, "true").build(); + cache = getIndicesRequestCache(settings); writer.addDocument(newDoc(0, "foo")); - DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); - BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); + DirectoryReader reader = getReader(writer, indexShard.shardId()); // initial cache IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); Loader loader = new Loader(reader, 0); - BytesReference value = cache.getOrCompute(entity, loader, reader, termBytes); + BytesReference value = cache.getOrCompute(entity, loader, reader, getTermBytes()); assertEquals("foo", value.streamInput().readString()); ShardRequestCache requestCacheStats = indexShard.requestCache(); assertEquals(0, requestCacheStats.stats().getHitCount()); @@ -188,7 +194,7 @@ public void testBasicOperationsCacheWithFeatureFlag() throws Exception { // cache hit entity = new IndicesService.IndexShardCacheEntity(indexShard); loader = new Loader(reader, 0); - value = cache.getOrCompute(entity, loader, reader, termBytes); + value = cache.getOrCompute(entity, loader, reader, getTermBytes()); assertEquals("foo", value.streamInput().readString()); requestCacheStats = indexShard.requestCache(); assertEquals(1, requestCacheStats.stats().getHitCount()); @@ -214,47 +220,28 @@ public void testBasicOperationsCacheWithFeatureFlag() throws Exception { assertEquals(0, cache.count()); assertEquals(0, requestCacheStats.stats().getMemorySize().bytesAsInt()); - IOUtils.close(reader, writer, dir, cache); - terminate(threadPool); + IOUtils.close(reader); assertEquals(0, cache.numRegisteredCloseListeners()); } public void testCacheDifferentReaders() throws Exception { - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexShard indexShard = createIndex("test").getShard(0); - ThreadPool threadPool = getThreadPool(); - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, (shardId -> { - IndexService indexService = null; - try { - indexService = indicesService.indexServiceSafe(shardId.getIndex()); - } catch (IndexNotFoundException ex) { - return Optional.empty(); - } - return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - }), - new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(), - threadPool, - ClusterServiceUtils.createClusterService(threadPool) - ); - Directory dir = newDirectory(); - IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); - + threadPool = getThreadPool(); + cache = getIndicesRequestCache(Settings.EMPTY); writer.addDocument(newDoc(0, "foo")); - DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); - BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); + DirectoryReader reader = getReader(writer, indexShard.shardId()); + if (randomBoolean()) { writer.flush(); IOUtils.close(writer); writer = new IndexWriter(dir, newIndexWriterConfig()); } writer.updateDocument(new Term("id", "0"), newDoc(0, "bar")); - DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + DirectoryReader secondReader = getReader(writer, indexShard.shardId()); // initial cache IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); Loader loader = new Loader(reader, 0); - BytesReference value = cache.getOrCompute(entity, loader, reader, termBytes); + BytesReference value = cache.getOrCompute(entity, loader, reader, getTermBytes()); ShardRequestCache requestCacheStats = entity.stats(); assertEquals("foo", value.streamInput().readString()); assertEquals(0, requestCacheStats.stats().getHitCount()); @@ -269,7 +256,7 @@ public void testCacheDifferentReaders() throws Exception { // cache the second IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); loader = new Loader(secondReader, 0); - value = cache.getOrCompute(entity, loader, secondReader, termBytes); + value = cache.getOrCompute(entity, loader, secondReader, getTermBytes()); requestCacheStats = entity.stats(); assertEquals("bar", value.streamInput().readString()); assertEquals(0, requestCacheStats.stats().getHitCount()); @@ -282,7 +269,7 @@ public void testCacheDifferentReaders() throws Exception { secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); loader = new Loader(secondReader, 0); - value = cache.getOrCompute(secondEntity, loader, secondReader, termBytes); + value = cache.getOrCompute(secondEntity, loader, secondReader, getTermBytes()); requestCacheStats = entity.stats(); assertEquals("bar", value.streamInput().readString()); assertEquals(1, requestCacheStats.stats().getHitCount()); @@ -293,7 +280,7 @@ public void testCacheDifferentReaders() throws Exception { entity = new IndicesService.IndexShardCacheEntity(indexShard); loader = new Loader(reader, 0); - value = cache.getOrCompute(entity, loader, reader, termBytes); + value = cache.getOrCompute(entity, loader, reader, getTermBytes()); assertEquals("foo", value.streamInput().readString()); requestCacheStats = entity.stats(); assertEquals(2, requestCacheStats.stats().getHitCount()); @@ -326,8 +313,7 @@ public void testCacheDifferentReaders() throws Exception { assertEquals(0, cache.count()); assertEquals(0, requestCacheStats.stats().getMemorySize().bytesAsInt()); - IOUtils.close(secondReader, writer, dir, cache); - terminate(threadPool); + IOUtils.close(secondReader); assertEquals(0, cache.numRegisteredCloseListeners()); } @@ -354,55 +340,20 @@ public void testCacheCleanupThresholdSettingValidator_Invalid_Percentage() { assertThrows(IllegalArgumentException.class, () -> { IndicesRequestCache.validateStalenessSetting("500%"); }); } + // when staleness threshold is zero, stale keys should be cleaned up every time cache cleaner is invoked. public void testCacheCleanupBasedOnZeroThreshold() throws Exception { - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexShard indexShard = createIndex("test").getShard(0); - ThreadPool threadPool = getThreadPool(); + threadPool = getThreadPool(); Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0%").build(); - IndicesRequestCache cache = new IndicesRequestCache(settings, (shardId -> { - IndexService indexService = null; - try { - indexService = indicesService.indexServiceSafe(shardId.getIndex()); - } catch (IndexNotFoundException ex) { - return Optional.empty(); - } - return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - }), - new CacheModule(new ArrayList<>(), settings).getCacheService(), - threadPool, - ClusterServiceUtils.createClusterService(threadPool) - ); - Directory dir = newDirectory(); - IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); - + cache = getIndicesRequestCache(settings); writer.addDocument(newDoc(0, "foo")); - DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); - BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - if (randomBoolean()) { - writer.flush(); - IOUtils.close(writer); - writer = new IndexWriter(dir, newIndexWriterConfig()); - } - writer.updateDocument(new Term("id", "0"), newDoc(0, "bar")); - DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + DirectoryReader reader = getReader(writer, indexShard.shardId()); + DirectoryReader secondReader = getReader(writer, indexShard.shardId()); // Get 2 entries into the cache - IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); - Loader loader = new Loader(reader, 0); - cache.getOrCompute(entity, loader, reader, termBytes); - - entity = new IndicesService.IndexShardCacheEntity(indexShard); - loader = new Loader(reader, 0); - cache.getOrCompute(entity, loader, reader, termBytes); - - IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); - loader = new Loader(secondReader, 0); - cache.getOrCompute(entity, loader, secondReader, termBytes); + cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes()); + assertEquals(1, cache.count()); - secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); - loader = new Loader(secondReader, 0); - cache.getOrCompute(secondEntity, loader, secondReader, termBytes); + cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes()); assertEquals(2, cache.count()); // Close the reader, to be enqueued for cleanup @@ -414,132 +365,32 @@ public void testCacheCleanupBasedOnZeroThreshold() throws Exception { cache.cacheCleanupManager.cleanCache(); // cleanup should remove the stale-key assertEquals(1, cache.count()); - - IOUtils.close(secondReader, writer, dir, cache); - terminate(threadPool); + IOUtils.close(secondReader); } - public void testCacheCleanupBasedOnStaleThreshold_StalenessEqualToThreshold() throws Exception { - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexShard indexShard = createIndex("test").getShard(0); - ThreadPool threadPool = getThreadPool(); - Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.5").build(); - IndicesRequestCache cache = new IndicesRequestCache(settings, (shardId -> { - IndexService indexService = null; - try { - indexService = indicesService.indexServiceSafe(shardId.getIndex()); - } catch (IndexNotFoundException ex) { - return Optional.empty(); - } - return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - }), - new CacheModule(new ArrayList<>(), settings).getCacheService(), - threadPool, - ClusterServiceUtils.createClusterService(threadPool) - ); - Directory dir = newDirectory(); - IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); + // when staleness count is higher than stale threshold, stale keys should be cleaned up. + public void testCacheCleanupBasedOnStaleThreshold_StalenessHigherThanThreshold() throws Exception { + threadPool = getThreadPool(); + Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.49").build(); + cache = getIndicesRequestCache(settings); writer.addDocument(newDoc(0, "foo")); - DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); - BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - if (randomBoolean()) { - writer.flush(); - IOUtils.close(writer); - writer = new IndexWriter(dir, newIndexWriterConfig()); - } - writer.updateDocument(new Term("id", "0"), newDoc(0, "bar")); - DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + DirectoryReader reader = getReader(writer, indexShard.shardId()); + DirectoryReader secondReader = getReader(writer, indexShard.shardId()); // Get 2 entries into the cache - IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); - Loader loader = new Loader(reader, 0); - cache.getOrCompute(entity, loader, reader, termBytes); - - entity = new IndicesService.IndexShardCacheEntity(indexShard); - loader = new Loader(reader, 0); - cache.getOrCompute(entity, loader, reader, termBytes); - - IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); - loader = new Loader(secondReader, 0); - cache.getOrCompute(entity, loader, secondReader, termBytes); - - secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); - loader = new Loader(secondReader, 0); - cache.getOrCompute(secondEntity, loader, secondReader, termBytes); - assertEquals(2, cache.count()); - - // Close the reader, to be enqueued for cleanup - // 1 out of 2 keys ie 50% are now stale. - reader.close(); - // cache count should not be affected - assertEquals(2, cache.count()); - - // clean cache with 50% staleness threshold - cache.cacheCleanupManager.cleanCache(); - // cleanup should have taken effect + cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes()); assertEquals(1, cache.count()); - IOUtils.close(secondReader, writer, dir, cache); - terminate(threadPool); - } - - public void testStaleCount_OnRemovalNotificationOfStaleKey_DecrementsStaleCount() throws Exception { - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexShard indexShard = createIndex("test").getShard(0); - ThreadPool threadPool = getThreadPool(); - Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51").build(); - IndicesRequestCache cache = new IndicesRequestCache(settings, (shardId -> { - IndexService indexService = null; - try { - indexService = indicesService.indexServiceSafe(shardId.getIndex()); - } catch (IndexNotFoundException ex) { - return Optional.empty(); - } - return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - }), - new CacheModule(new ArrayList<>(), settings).getCacheService(), - threadPool, - ClusterServiceUtils.createClusterService(threadPool) - ); - Directory dir = newDirectory(); - IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); - - writer.addDocument(newDoc(0, "foo")); - DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); - BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - if (randomBoolean()) { - writer.flush(); - IOUtils.close(writer); - writer = new IndexWriter(dir, newIndexWriterConfig()); - } - writer.updateDocument(new Term("id", "0"), newDoc(0, "bar")); - DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - - // Get 2 entries into the cache - IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); - Loader loader = new Loader(reader, 0); - cache.getOrCompute(entity, loader, reader, termBytes); - - entity = new IndicesService.IndexShardCacheEntity(indexShard); - loader = new Loader(reader, 0); - cache.getOrCompute(entity, loader, reader, termBytes); - - IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); - loader = new Loader(secondReader, 0); - cache.getOrCompute(entity, loader, secondReader, termBytes); - - secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); - loader = new Loader(secondReader, 0); - cache.getOrCompute(secondEntity, loader, secondReader, termBytes); + cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes()); assertEquals(2, cache.count()); + // no stale keys so far + assertEquals(0, cache.cacheCleanupManager.getStaleKeysCount().get()); // Close the reader, to be enqueued for cleanup reader.close(); - AtomicInteger staleKeysCount = cache.cacheCleanupManager.getStaleKeysCount(); // 1 out of 2 keys ie 50% are now stale. +<<<<<<< HEAD assertEquals(1, staleKeysCount.get()); // cache count should not be affected assertEquals(2, cache.count()); @@ -693,6 +544,9 @@ public void testCacheCleanupBasedOnStaleThreshold_StalenessGreaterThanThreshold( // Close the reader, to be enqueued for cleanup // 1 out of 2 keys ie 50% are now stale. reader.close(); +======= + assertEquals(1, cache.cacheCleanupManager.getStaleKeysCount().get()); +>>>>>>> db361ecead1 ([Tiered Caching] Bug fix for IndicesRequestCache StaleKey management (#13070)) // cache count should not be affected assertEquals(2, cache.count()); @@ -700,17 +554,370 @@ public void testCacheCleanupBasedOnStaleThreshold_StalenessGreaterThanThreshold( cache.cacheCleanupManager.cleanCache(); // cleanup should have taken effect with 49% threshold assertEquals(1, cache.count()); + assertEquals(0, cache.cacheCleanupManager.getStaleKeysCount().get()); - IOUtils.close(secondReader, writer, dir, cache); - terminate(threadPool); + IOUtils.close(secondReader); + } + + // when staleness count equal to stale threshold, stale keys should be cleaned up. + public void testCacheCleanupBasedOnStaleThreshold_StalenessEqualToThreshold() throws Exception { + threadPool = getThreadPool(); + Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.5").build(); + cache = getIndicesRequestCache(settings); + writer.addDocument(newDoc(0, "foo")); + DirectoryReader reader = getReader(writer, indexShard.shardId()); + DirectoryReader secondReader = getReader(writer, indexShard.shardId()); + + // Get 2 entries into the cache + cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes()); + assertEquals(1, cache.count()); + + cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes()); + assertEquals(2, cache.count()); + + // Close the reader, to be enqueued for cleanup + reader.close(); + // 1 out of 2 keys ie 50% are now stale. + assertEquals(1, cache.cacheCleanupManager.getStaleKeysCount().get()); + // cache count should not be affected + assertEquals(2, cache.count()); + + // clean cache with 50% staleness threshold + cache.cacheCleanupManager.cleanCache(); + // cleanup should have taken effect + assertEquals(0, cache.cacheCleanupManager.getStaleKeysCount().get()); + assertEquals(1, cache.count()); + + IOUtils.close(secondReader); + } + + // when a cache entry that is Stale is evicted for any reason, we have to deduct the count from our staleness count + public void testStaleCount_OnRemovalNotificationOfStaleKey_DecrementsStaleCount() throws Exception { + threadPool = getThreadPool(); + Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51").build(); + cache = getIndicesRequestCache(settings); + writer.addDocument(newDoc(0, "foo")); + ShardId shardId = indexShard.shardId(); + DirectoryReader reader = getReader(writer, indexShard.shardId()); + DirectoryReader secondReader = getReader(writer, indexShard.shardId()); + + // Get 2 entries into the cache from 2 different readers + cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes()); + assertEquals(1, cache.count()); + + cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes()); + assertEquals(2, cache.count()); + + // assert no stale keys are accounted so far + assertEquals(0, cache.cacheCleanupManager.getStaleKeysCount().get()); + // Close the reader, this should create a stale key + reader.close(); + // 1 out of 2 keys ie 50% are now stale. + assertEquals(1, cache.cacheCleanupManager.getStaleKeysCount().get()); + // cache count should not be affected + assertEquals(2, cache.count()); + + IndicesRequestCache.Key key = new IndicesRequestCache.Key(indexShard.shardId(), getTermBytes(), getReaderCacheKeyId(reader)); + // test the mapping + ConcurrentMap> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap(); + // shard id should exist + assertTrue(cleanupKeyToCountMap.containsKey(shardId)); + // reader CacheKeyId should NOT exist + assertFalse(cleanupKeyToCountMap.get(shardId).containsKey(getReaderCacheKeyId(reader))); + // secondReader CacheKeyId should exist + assertTrue(cleanupKeyToCountMap.get(shardId).containsKey(getReaderCacheKeyId(secondReader))); + + cache.onRemoval( + new RemovalNotification, BytesReference>( + new ICacheKey<>(key), + getTermBytes(), + RemovalReason.EVICTED + ) + ); + + // test the mapping, it should stay the same + // shard id should exist + assertTrue(cleanupKeyToCountMap.containsKey(shardId)); + // reader CacheKeyId should NOT exist + assertFalse(cleanupKeyToCountMap.get(shardId).containsKey(getReaderCacheKeyId(reader))); + // secondReader CacheKeyId should exist + assertTrue(cleanupKeyToCountMap.get(shardId).containsKey(getReaderCacheKeyId(secondReader))); + // eviction of previous stale key from the cache should decrement staleKeysCount in iRC + assertEquals(0, cache.cacheCleanupManager.getStaleKeysCount().get()); + + IOUtils.close(secondReader); + } + + // when a cache entry that is NOT Stale is evicted for any reason, staleness count should NOT be deducted + public void testStaleCount_OnRemovalNotificationOfNonStaleKey_DoesNotDecrementsStaleCount() throws Exception { + threadPool = getThreadPool(); + Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51").build(); + cache = getIndicesRequestCache(settings); + writer.addDocument(newDoc(0, "foo")); + ShardId shardId = indexShard.shardId(); + DirectoryReader reader = getReader(writer, indexShard.shardId()); + DirectoryReader secondReader = getReader(writer, indexShard.shardId()); + + // Get 2 entries into the cache + cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes()); + assertEquals(1, cache.count()); + + cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes()); + assertEquals(2, cache.count()); + + // Close the reader, to be enqueued for cleanup + reader.close(); + AtomicInteger staleKeysCount = cache.cacheCleanupManager.getStaleKeysCount(); + // 1 out of 2 keys ie 50% are now stale. + assertEquals(1, staleKeysCount.get()); + // cache count should not be affected + assertEquals(2, cache.count()); + + // evict entry from second reader (this reader is not closed) + IndicesRequestCache.Key key = new IndicesRequestCache.Key(indexShard.shardId(), getTermBytes(), getReaderCacheKeyId(secondReader)); + + // test the mapping + ConcurrentMap> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap(); + // shard id should exist + assertTrue(cleanupKeyToCountMap.containsKey(shardId)); + // reader CacheKeyId should NOT exist + assertFalse(cleanupKeyToCountMap.get(shardId).containsKey(getReaderCacheKeyId(reader))); + // secondReader CacheKeyId should exist + assertTrue(cleanupKeyToCountMap.get(shardId).containsKey(getReaderCacheKeyId(secondReader))); + + cache.onRemoval( + new RemovalNotification, BytesReference>( + new ICacheKey<>(key), + getTermBytes(), + RemovalReason.EVICTED + ) + ); + + // test the mapping, shardId entry should be cleaned up + // shard id should NOT exist + assertFalse(cleanupKeyToCountMap.containsKey(shardId)); + + staleKeysCount = cache.cacheCleanupManager.getStaleKeysCount(); + // eviction of NON-stale key from the cache should NOT decrement staleKeysCount in iRC + assertEquals(1, staleKeysCount.get()); + + IOUtils.close(secondReader); + } + + // when a cache entry that is NOT Stale is evicted WITHOUT its reader closing, we should NOT deduct it from staleness count + public void testStaleCount_WithoutReaderClosing_DecrementsStaleCount() throws Exception { + threadPool = getThreadPool(); + Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51").build(); + cache = getIndicesRequestCache(settings); + + writer.addDocument(newDoc(0, "foo")); + DirectoryReader reader = getReader(writer, indexShard.shardId()); + DirectoryReader secondReader = getReader(writer, indexShard.shardId()); + + // Get 2 entries into the cache from 2 different readers + cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes()); + assertEquals(1, cache.count()); + + cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes()); + assertEquals(2, cache.count()); + + // no keys are stale + assertEquals(0, cache.cacheCleanupManager.getStaleKeysCount().get()); + // create notification for removal of non-stale entry + IndicesRequestCache.Key key = new IndicesRequestCache.Key(indexShard.shardId(), getTermBytes(), getReaderCacheKeyId(reader)); + cache.onRemoval( + new RemovalNotification, BytesReference>( + new ICacheKey<>(key), + getTermBytes(), + RemovalReason.EVICTED + ) + ); + // stale keys count should stay zero + assertEquals(0, cache.cacheCleanupManager.getStaleKeysCount().get()); + + IOUtils.close(reader, secondReader); + } + + // test staleness count based on removal notifications + public void testStaleCount_OnRemovalNotifications() throws Exception { + threadPool = getThreadPool(); + Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51").build(); + cache = getIndicesRequestCache(settings); + + writer.addDocument(newDoc(0, "foo")); + DirectoryReader reader = getReader(writer, indexShard.shardId()); + + // Get 5 entries into the cache + int totalKeys = 5; + IndicesService.IndexShardCacheEntity entity = null; + TermQueryBuilder termQuery = null; + BytesReference termBytes = null; + for (int i = 1; i <= totalKeys; i++) { + termQuery = new TermQueryBuilder("id", "" + i); + termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); + entity = new IndicesService.IndexShardCacheEntity(indexShard); + Loader loader = new Loader(reader, 0); + cache.getOrCompute(entity, loader, reader, termBytes); + assertEquals(i, cache.count()); + } + // no keys are stale yet + assertEquals(0, cache.cacheCleanupManager.getStaleKeysCount().get()); + // closing the reader should make all keys stale + reader.close(); + assertEquals(totalKeys, cache.cacheCleanupManager.getStaleKeysCount().get()); + + String readerCacheKeyId = getReaderCacheKeyId(reader); + IndicesRequestCache.Key key = new IndicesRequestCache.Key( + ((IndexShard) entity.getCacheIdentity()).shardId(), + termBytes, + readerCacheKeyId + ); + + int staleCount = cache.cacheCleanupManager.getStaleKeysCount().get(); + // Notification for Replaced should not deduct the staleCount + cache.onRemoval( + new RemovalNotification, BytesReference>( + new ICacheKey<>(key), + getTermBytes(), + RemovalReason.REPLACED + ) + ); + // stale keys count should stay the same + assertEquals(staleCount, cache.cacheCleanupManager.getStaleKeysCount().get()); + + // Notification for all but Replaced should deduct the staleCount + RemovalReason[] reasons = { RemovalReason.INVALIDATED, RemovalReason.EVICTED, RemovalReason.EXPLICIT, RemovalReason.CAPACITY }; + for (RemovalReason reason : reasons) { + cache.onRemoval( + new RemovalNotification, BytesReference>(new ICacheKey<>(key), getTermBytes(), reason) + ); + assertEquals(--staleCount, cache.cacheCleanupManager.getStaleKeysCount().get()); + } } + // when staleness count less than the stale threshold, stale keys should NOT be cleaned up. public void testCacheCleanupBasedOnStaleThreshold_StalenessLesserThanThreshold() throws Exception { - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexShard indexShard = createIndex("test").getShard(0); - ThreadPool threadPool = getThreadPool(); + threadPool = getThreadPool(); Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "51%").build(); - IndicesRequestCache cache = new IndicesRequestCache(settings, (shardId -> { + cache = getIndicesRequestCache(settings); + + writer.addDocument(newDoc(0, "foo")); + DirectoryReader reader = getReader(writer, indexShard.shardId()); + DirectoryReader secondReader = getReader(writer, indexShard.shardId()); + + // Get 2 entries into the cache + cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes()); + assertEquals(1, cache.count()); + + cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes()); + assertEquals(2, cache.count()); + + // Close the reader, to be enqueued for cleanup + reader.close(); + // 1 out of 2 keys ie 50% are now stale. + assertEquals(1, cache.cacheCleanupManager.getStaleKeysCount().get()); + // cache count should not be affected + assertEquals(2, cache.count()); + + // clean cache with 51% staleness threshold + cache.cacheCleanupManager.cleanCache(); + // cleanup should have been ignored + assertEquals(1, cache.cacheCleanupManager.getStaleKeysCount().get()); + assertEquals(2, cache.count()); + + IOUtils.close(secondReader); + } + + // test the cleanupKeyToCountMap are set appropriately when both readers are closed + public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception { + threadPool = getThreadPool(); + Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.51").build(); + cache = getIndicesRequestCache(settings); + + writer.addDocument(newDoc(0, "foo")); + ShardId shardId = indexShard.shardId(); + DirectoryReader reader = getReader(writer, shardId); + DirectoryReader secondReader = getReader(writer, shardId); + + // Get 2 entries into the cache from 2 different readers + cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes()); + assertEquals(1, cache.count()); + // test the mappings + ConcurrentMap> cleanupKeyToCountMap = cache.cacheCleanupManager.getCleanupKeyToCountMap(); + assertEquals(1, (int) cleanupKeyToCountMap.get(shardId).get(getReaderCacheKeyId(reader))); + + cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes()); + // test the mapping + assertEquals(2, cache.count()); + assertEquals(1, (int) cleanupKeyToCountMap.get(shardId).get(getReaderCacheKeyId(secondReader))); + // create another entry for the second reader + cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes("id", "1")); + // test the mapping + assertEquals(3, cache.count()); + assertEquals(2, (int) cleanupKeyToCountMap.get(shardId).get(getReaderCacheKeyId(secondReader))); + + // Close the reader, to create stale entries + reader.close(); + // cache count should not be affected + assertEquals(3, cache.count()); + // test the mapping, first reader's entry should be removed from the mapping and accounted for in the staleKeysCount + assertFalse(cleanupKeyToCountMap.get(shardId).containsKey(getReaderCacheKeyId(reader))); + assertEquals(1, cache.cacheCleanupManager.getStaleKeysCount().get()); + // second reader's mapping should not be affected + assertEquals(2, (int) cleanupKeyToCountMap.get(shardId).get(getReaderCacheKeyId(secondReader))); + // send removal notification for first reader + IndicesRequestCache.Key key = new IndicesRequestCache.Key(indexShard.shardId(), getTermBytes(), getReaderCacheKeyId(reader)); + cache.onRemoval( + new RemovalNotification, BytesReference>( + new ICacheKey<>(key), + getTermBytes(), + RemovalReason.EVICTED + ) + ); + // test the mapping, it should stay the same + assertFalse(cleanupKeyToCountMap.get(shardId).containsKey(getReaderCacheKeyId(reader))); + // staleKeysCount should be decremented + assertEquals(0, cache.cacheCleanupManager.getStaleKeysCount().get()); + // second reader's mapping should not be affected + assertEquals(2, (int) cleanupKeyToCountMap.get(shardId).get(getReaderCacheKeyId(secondReader))); + + // Without closing the secondReader send removal notification of one of its key + key = new IndicesRequestCache.Key(indexShard.shardId(), getTermBytes(), getReaderCacheKeyId(secondReader)); + cache.onRemoval( + new RemovalNotification, BytesReference>( + new ICacheKey<>(key), + getTermBytes(), + RemovalReason.EVICTED + ) + ); + // staleKeysCount should be the same as before + assertEquals(0, cache.cacheCleanupManager.getStaleKeysCount().get()); + // secondReader's readerCacheKeyId count should be decremented by 1 + assertEquals(1, (int) cleanupKeyToCountMap.get(shardId).get(getReaderCacheKeyId(secondReader))); + // Without closing the secondReader send removal notification of its last key + key = new IndicesRequestCache.Key(indexShard.shardId(), getTermBytes(), getReaderCacheKeyId(secondReader)); + cache.onRemoval( + new RemovalNotification, BytesReference>( + new ICacheKey<>(key), + getTermBytes(), + RemovalReason.EVICTED + ) + ); + // staleKeysCount should be the same as before + assertEquals(0, cache.cacheCleanupManager.getStaleKeysCount().get()); + // since all the readers of this shard is closed, the cleanupKeyToCountMap should have no entries + assertEquals(0, cleanupKeyToCountMap.size()); + + IOUtils.close(secondReader); + } + + private DirectoryReader getReader(IndexWriter writer, ShardId shardId) throws IOException { + return OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), shardId); + } + + private IndicesRequestCache getIndicesRequestCache(Settings settings) { + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + return new IndicesRequestCache(settings, (shardId -> { IndexService indexService = null; try { indexService = indicesService.indexServiceSafe(shardId.getIndex()); @@ -723,11 +930,71 @@ public void testCacheCleanupBasedOnStaleThreshold_StalenessLesserThanThreshold() threadPool, ClusterServiceUtils.createClusterService(threadPool) ); - Directory dir = newDirectory(); - IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); + } + + private Loader getLoader(DirectoryReader reader) { + return new Loader(reader, 0); + } + + private IndicesService.IndexShardCacheEntity getEntity(IndexShard indexShard) { + return new IndicesService.IndexShardCacheEntity(indexShard); + } + + private BytesReference getTermBytes() throws IOException { + TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); + return XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); + } + + private BytesReference getTermBytes(String fieldName, String value) throws IOException { + TermQueryBuilder termQuery = new TermQueryBuilder(fieldName, value); + return XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); + } + + private String getReaderCacheKeyId(DirectoryReader reader) { + OpenSearchDirectoryReader.DelegatingCacheHelper delegatingCacheHelper = (OpenSearchDirectoryReader.DelegatingCacheHelper) reader + .getReaderCacheHelper(); + return delegatingCacheHelper.getDelegatingCacheKey().getId(); + } + +<<<<<<< HEAD +======= + public void testClosingIndexWipesStats() throws Exception { + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + // Create two indices each with multiple shards + int numShards = 3; + Settings indexSettings = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShards).build(); + String indexToKeepName = "test"; + String indexToCloseName = "test2"; + // delete all indices if already + assertAcked(client().admin().indices().prepareDelete("_all").get()); + IndexService indexToKeep = createIndex(indexToKeepName, indexSettings); + IndexService indexToClose = createIndex(indexToCloseName, indexSettings); + for (int i = 0; i < numShards; i++) { + // Check we can get all the shards we expect + assertNotNull(indexToKeep.getShard(i)); + assertNotNull(indexToClose.getShard(i)); + } + threadPool = getThreadPool(); + Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "0.001%").build(); + cache = new IndicesRequestCache(settings, (shardId -> { + IndexService indexService = null; + try { + indexService = indicesService.indexServiceSafe(shardId.getIndex()); + } catch (IndexNotFoundException ex) { + return Optional.empty(); + } + try { + return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); + } catch (ShardNotFoundException ex) { + return Optional.empty(); + } + }), + new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(), + threadPool, + ClusterServiceUtils.createClusterService(threadPool) + ); writer.addDocument(newDoc(0, "foo")); - DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); if (randomBoolean()) { @@ -738,76 +1005,88 @@ public void testCacheCleanupBasedOnStaleThreshold_StalenessLesserThanThreshold() writer.updateDocument(new Term("id", "0"), newDoc(0, "bar")); DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - // Get 2 entries into the cache - IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); - Loader loader = new Loader(reader, 0); - cache.getOrCompute(entity, loader, reader, termBytes); - - entity = new IndicesService.IndexShardCacheEntity(indexShard); - loader = new Loader(reader, 0); - cache.getOrCompute(entity, loader, reader, termBytes); - - IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); - loader = new Loader(secondReader, 0); - cache.getOrCompute(entity, loader, secondReader, termBytes); - - secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); - loader = new Loader(secondReader, 0); - cache.getOrCompute(secondEntity, loader, secondReader, termBytes); - assertEquals(2, cache.count()); + List readersToClose = new ArrayList<>(); + List readersToKeep = new ArrayList<>(); + // Put entries into the cache for each shard + for (IndexService indexService : new IndexService[] { indexToKeep, indexToClose }) { + for (int i = 0; i < numShards; i++) { + IndexShard indexShard = indexService.getShard(i); + IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); + DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), indexShard.shardId()); + if (indexService == indexToClose) { + readersToClose.add(reader); + } else { + readersToKeep.add(reader); + } + Loader loader = new Loader(reader, 0); + cache.getOrCompute(entity, loader, reader, termBytes); + } + } - // Close the reader, to be enqueued for cleanup - // 1 out of 2 keys ie 50% are now stale. - reader.close(); - // cache count should not be affected - assertEquals(2, cache.count()); + // Check resulting stats + List> initialDimensionValues = new ArrayList<>(); + for (IndexService indexService : new IndexService[] { indexToKeep, indexToClose }) { + for (int i = 0; i < numShards; i++) { + ShardId shardId = indexService.getShard(i).shardId(); + List dimensionValues = List.of(shardId.getIndexName(), shardId.toString()); + initialDimensionValues.add(dimensionValues); + ImmutableCacheStats snapshot = cache.stats().getStatsForDimensionValues(dimensionValues); + assertNotNull(snapshot); + // check the values are not empty by confirming entries != 0, this should always be true since the missed value is loaded + // into the cache + assertNotEquals(0, snapshot.getEntries()); + } + } - // clean cache with 51% staleness threshold + // Delete an index + indexToClose.close("test_deletion", true); + // This actually closes the shards associated with the readers, which is necessary for cache cleanup logic + // In this UT, manually close the readers as well; could not figure out how to connect all this up in a UT so that + // we could get readers that were properly connected to an index's directory + for (DirectoryReader reader : readersToClose) { + IOUtils.close(reader); + } + // Trigger cache cleanup cache.cacheCleanupManager.cleanCache(); - // cleanup should have been ignored - assertEquals(2, cache.count()); - IOUtils.close(secondReader, writer, dir, cache); - terminate(threadPool); + // Now stats for the closed index should be gone + for (List dimensionValues : initialDimensionValues) { + ImmutableCacheStats snapshot = cache.stats().getStatsForDimensionValues(dimensionValues); + if (dimensionValues.get(0).equals(indexToCloseName)) { + assertNull(snapshot); + } else { + assertNotNull(snapshot); + // check the values are not empty by confirming entries != 0, this should always be true since the missed value is loaded + // into the cache + assertNotEquals(0, snapshot.getEntries()); + } + } + + for (DirectoryReader reader : readersToKeep) { + IOUtils.close(reader); + } + IOUtils.close(secondReader); } +>>>>>>> db361ecead1 ([Tiered Caching] Bug fix for IndicesRequestCache StaleKey management (#13070)) public void testEviction() throws Exception { final ByteSizeValue size; { - IndexShard indexShard = createIndex("test").getShard(0); - ThreadPool threadPool = getThreadPool(); - IndicesRequestCache cache = new IndicesRequestCache( - Settings.EMPTY, - (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))), - new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(), - threadPool, - ClusterServiceUtils.createClusterService(threadPool) - ); - Directory dir = newDirectory(); - IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); - + threadPool = getThreadPool(); + cache = getIndicesRequestCache(Settings.EMPTY); writer.addDocument(newDoc(0, "foo")); - DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); - BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); - Loader loader = new Loader(reader, 0); - + DirectoryReader reader = getReader(writer, indexShard.shardId()); writer.updateDocument(new Term("id", "0"), newDoc(0, "bar")); DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); - Loader secondLoader = new Loader(secondReader, 0); - BytesReference value1 = cache.getOrCompute(entity, loader, reader, termBytes); + BytesReference value1 = cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes()); assertEquals("foo", value1.streamInput().readString()); - BytesReference value2 = cache.getOrCompute(secondEntity, secondLoader, secondReader, termBytes); + BytesReference value2 = cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes()); assertEquals("bar", value2.streamInput().readString()); size = indexShard.requestCache().stats().getMemorySize(); IOUtils.close(reader, secondReader, writer, dir, cache); - terminate(threadPool); } - IndexShard indexShard = createIndex("test1").getShard(0); - ThreadPool threadPool = getThreadPool(); + indexShard = createIndex("test1").getShard(0); IndicesRequestCache cache = new IndicesRequestCache( Settings.builder().put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), size.getBytes() + 1 + "b").build(), (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))), @@ -815,83 +1094,52 @@ public void testEviction() throws Exception { threadPool, ClusterServiceUtils.createClusterService(threadPool) ); - Directory dir = newDirectory(); - IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); - + dir = newDirectory(); + writer = new IndexWriter(dir, newIndexWriterConfig()); writer.addDocument(newDoc(0, "foo")); - DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); - BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); - Loader loader = new Loader(reader, 0); - + DirectoryReader reader = getReader(writer, indexShard.shardId()); writer.updateDocument(new Term("id", "0"), newDoc(0, "bar")); - DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); - Loader secondLoader = new Loader(secondReader, 0); - + DirectoryReader secondReader = getReader(writer, indexShard.shardId()); writer.updateDocument(new Term("id", "0"), newDoc(0, "baz")); DirectoryReader thirdReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - IndicesService.IndexShardCacheEntity thirddEntity = new IndicesService.IndexShardCacheEntity(indexShard); - Loader thirdLoader = new Loader(thirdReader, 0); - BytesReference value1 = cache.getOrCompute(entity, loader, reader, termBytes); + BytesReference value1 = cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes()); assertEquals("foo", value1.streamInput().readString()); - BytesReference value2 = cache.getOrCompute(secondEntity, secondLoader, secondReader, termBytes); + BytesReference value2 = cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes()); assertEquals("bar", value2.streamInput().readString()); logger.info("Memory size: {}", indexShard.requestCache().stats().getMemorySize()); - BytesReference value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, termBytes); + BytesReference value3 = cache.getOrCompute(getEntity(indexShard), getLoader(thirdReader), thirdReader, getTermBytes()); assertEquals("baz", value3.streamInput().readString()); assertEquals(2, cache.count()); assertEquals(1, indexShard.requestCache().stats().getEvictions()); - IOUtils.close(reader, secondReader, thirdReader, writer, dir, cache); - terminate(threadPool); + IOUtils.close(reader, secondReader, thirdReader); } public void testClearAllEntityIdentity() throws Exception { - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexShard indexShard = createIndex("test").getShard(0); - ThreadPool threadPool = getThreadPool(); - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, (shardId -> { - IndexService indexService = null; - try { - indexService = indicesService.indexServiceSafe(shardId.getIndex()); - } catch (IndexNotFoundException ex) { - return Optional.empty(); - } - return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - }), - new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(), - threadPool, - ClusterServiceUtils.createClusterService(threadPool) - ); - - Directory dir = newDirectory(); - IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); - + threadPool = getThreadPool(); + cache = getIndicesRequestCache(Settings.EMPTY); writer.addDocument(newDoc(0, "foo")); - DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); - BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); + DirectoryReader reader = getReader(writer, indexShard.shardId()); IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); Loader loader = new Loader(reader, 0); writer.updateDocument(new Term("id", "0"), newDoc(0, "bar")); - DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + DirectoryReader secondReader = getReader(writer, indexShard.shardId()); IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); Loader secondLoader = new Loader(secondReader, 0); writer.updateDocument(new Term("id", "0"), newDoc(0, "baz")); - DirectoryReader thirdReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + DirectoryReader thirdReader = getReader(writer, indexShard.shardId()); + ; IndicesService.IndexShardCacheEntity thirddEntity = new IndicesService.IndexShardCacheEntity(createIndex("test1").getShard(0)); Loader thirdLoader = new Loader(thirdReader, 0); - BytesReference value1 = cache.getOrCompute(entity, loader, reader, termBytes); + BytesReference value1 = cache.getOrCompute(entity, loader, reader, getTermBytes()); assertEquals("foo", value1.streamInput().readString()); - BytesReference value2 = cache.getOrCompute(secondEntity, secondLoader, secondReader, termBytes); + BytesReference value2 = cache.getOrCompute(secondEntity, secondLoader, secondReader, getTermBytes()); assertEquals("bar", value2.streamInput().readString()); logger.info("Memory size: {}", indexShard.requestCache().stats().getMemorySize()); - BytesReference value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, termBytes); + BytesReference value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, getTermBytes()); assertEquals("baz", value3.streamInput().readString()); assertEquals(3, cache.count()); RequestCacheStats requestCacheStats = entity.stats().stats(); @@ -902,14 +1150,13 @@ public void testClearAllEntityIdentity() throws Exception { cache.cacheCleanupManager.cleanCache(); assertEquals(1, cache.count()); // third has not been validated since it's a different identity - value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, termBytes); + value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, getTermBytes()); requestCacheStats = entity.stats().stats(); requestCacheStats.add(thirddEntity.stats().stats()); assertEquals(hitCount + 1, requestCacheStats.getHitCount()); assertEquals("baz", value3.streamInput().readString()); - IOUtils.close(reader, secondReader, thirdReader, writer, dir, cache); - terminate(threadPool); + IOUtils.close(reader, secondReader, thirdReader); } public Iterable newDoc(int id, String value) { @@ -921,7 +1168,7 @@ public Iterable newDoc(int id, String value) { private static class Loader implements CheckedSupplier { - private final DirectoryReader reader; + final DirectoryReader reader; private final int id; public boolean loadedFromCache = true; @@ -945,38 +1192,18 @@ public BytesReference get() { throw new RuntimeException(e); } } - } public void testInvalidate() throws Exception { - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndexShard indexShard = createIndex("test").getShard(0); - ThreadPool threadPool = getThreadPool(); - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, (shardId -> { - IndexService indexService = null; - try { - indexService = indicesService.indexServiceSafe(shardId.getIndex()); - } catch (IndexNotFoundException ex) { - return Optional.empty(); - } - return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - }), - new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(), - threadPool, - ClusterServiceUtils.createClusterService(threadPool) - ); - Directory dir = newDirectory(); - IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); - + threadPool = getThreadPool(); + IndicesRequestCache cache = getIndicesRequestCache(Settings.EMPTY); writer.addDocument(newDoc(0, "foo")); - DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); - BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); + DirectoryReader reader = getReader(writer, indexShard.shardId()); // initial cache IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); Loader loader = new Loader(reader, 0); - BytesReference value = cache.getOrCompute(entity, loader, reader, termBytes); + BytesReference value = cache.getOrCompute(entity, loader, reader, getTermBytes()); assertEquals("foo", value.streamInput().readString()); ShardRequestCache requestCacheStats = entity.stats(); assertEquals(0, requestCacheStats.stats().getHitCount()); @@ -988,7 +1215,7 @@ public void testInvalidate() throws Exception { // cache hit entity = new IndicesService.IndexShardCacheEntity(indexShard); loader = new Loader(reader, 0); - value = cache.getOrCompute(entity, loader, reader, termBytes); + value = cache.getOrCompute(entity, loader, reader, getTermBytes()); assertEquals("foo", value.streamInput().readString()); requestCacheStats = entity.stats(); assertEquals(1, requestCacheStats.stats().getHitCount()); @@ -1002,8 +1229,8 @@ public void testInvalidate() throws Exception { // load again after invalidate entity = new IndicesService.IndexShardCacheEntity(indexShard); loader = new Loader(reader, 0); - cache.invalidate(entity, reader, termBytes); - value = cache.getOrCompute(entity, loader, reader, termBytes); + cache.invalidate(entity, reader, getTermBytes()); + value = cache.getOrCompute(entity, loader, reader, getTermBytes()); assertEquals("foo", value.streamInput().readString()); requestCacheStats = entity.stats(); assertEquals(1, requestCacheStats.stats().getHitCount()); @@ -1028,16 +1255,11 @@ public void testInvalidate() throws Exception { assertEquals(0, cache.count()); assertEquals(0, requestCacheStats.stats().getMemorySize().bytesAsInt()); - IOUtils.close(reader, writer, dir, cache); - terminate(threadPool); + IOUtils.close(reader); assertEquals(0, cache.numRegisteredCloseListeners()); } public void testEqualsKey() throws IOException { - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - Directory dir = newDirectory(); - IndexWriterConfig config = newIndexWriterConfig(); - IndexWriter writer = new IndexWriter(dir, config); ShardId shardId = new ShardId("foo", "bar", 1); ShardId shardId1 = new ShardId("foo1", "bar1", 2); IndexReader reader1 = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), shardId); @@ -1064,13 +1286,9 @@ public void testEqualsKey() throws IOException { } public void testSerializationDeserializationOfCacheKey() throws Exception { - TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); - BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - IndexService indexService = createIndex("test"); - IndexShard indexShard = indexService.getShard(0); IndicesService.IndexShardCacheEntity shardCacheEntity = new IndicesService.IndexShardCacheEntity(indexShard); String readerCacheKeyId = UUID.randomUUID().toString(); - IndicesRequestCache.Key key1 = new IndicesRequestCache.Key(indexShard.shardId(), termBytes, readerCacheKeyId); + IndicesRequestCache.Key key1 = new IndicesRequestCache.Key(indexShard.shardId(), getTermBytes(), readerCacheKeyId); BytesReference bytesReference = null; try (BytesStreamOutput out = new BytesStreamOutput()) { key1.writeTo(out); @@ -1082,8 +1300,7 @@ public void testSerializationDeserializationOfCacheKey() throws Exception { assertEquals(readerCacheKeyId, key2.readerCacheKeyId); assertEquals(((IndexShard) shardCacheEntity.getCacheIdentity()).shardId(), key2.shardId); - assertEquals(termBytes, key2.value); - + assertEquals(getTermBytes(), key2.value); } private class TestBytesReference extends AbstractBytesReference {