diff --git a/CHANGELOG.md b/CHANGELOG.md index 5717d65014636..cd695fe934236 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -75,6 +75,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix double invocation of postCollection when MultiBucketCollector is present ([#14015](https://github.com/opensearch-project/OpenSearch/pull/14015)) - Fix ReplicaShardBatchAllocator to batch shards without duplicates ([#13710](https://github.com/opensearch-project/OpenSearch/pull/13710)) - Java high-level REST client bulk() is not respecting the bulkRequest.requireAlias(true) method call ([#14146](https://github.com/opensearch-project/OpenSearch/pull/14146)) +- Fix ShardNotFoundException during request cache clean up ([#14219](https://github.com/opensearch-project/OpenSearch/pull/14219)) ### Security diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index d0e2e30f3d531..6fd5794cf29f2 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -153,7 +153,8 @@ public final class IndicesRequestCache implements RemovalListener cache; private final ClusterService clusterService; - private final Function> cacheEntityLookup; + // pkg-private for testing + final Function> cacheEntityLookup; // pkg-private for testing final IndicesRequestCacheCleanupManager cacheCleanupManager; diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index ce8d23db87c30..8c02120225897 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -406,7 +406,7 @@ public IndicesService( if (indexService == null) { return Optional.empty(); } - return Optional.of(new IndexShardCacheEntity(indexService.getShard(shardId.id()))); + return Optional.of(new IndexShardCacheEntity(indexService.getShardOrNull(shardId.id()))); }), cacheService, threadPool, clusterService); this.indicesQueryCache = new IndicesQueryCache(settings); this.mapperRegistry = mapperRegistry; diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index dcddd9f3d1318..9dbdddb76ea24 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -105,6 +105,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static java.util.Collections.emptyMap; @@ -798,15 +799,9 @@ private DirectoryReader getReader(IndexWriter writer, ShardId shardId) throws IO private IndicesRequestCache getIndicesRequestCache(Settings settings) { IndicesService indicesService = getInstanceFromNode(IndicesService.class); - return new IndicesRequestCache(settings, (shardId -> { - IndexService indexService = null; - try { - indexService = indicesService.indexServiceSafe(shardId.getIndex()); - } catch (IndexNotFoundException ex) { - return Optional.empty(); - } - return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); - }), + return new IndicesRequestCache( + settings, + indicesService.indicesRequestCache.cacheEntityLookup, new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(), threadPool, ClusterServiceUtils.createClusterService(threadPool) @@ -1419,6 +1414,55 @@ public void testDeleteAndCreateIndexShardOnSameNodeAndVerifyStats() throws Excep IOUtils.close(reader, writer, dir, cache); } + public void testIndexShardClosedAndVerifyCacheCleanUpWorksSuccessfully() throws Exception { + threadPool = getThreadPool(); + String indexName = "test1"; + // Create a shard + IndexService indexService = createIndex( + indexName, + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build() + ); + IndexShard indexShard = indexService.getShard(0); + Directory dir = newDirectory(); + IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); + writer.addDocument(newDoc(0, "foo")); + writer.addDocument(newDoc(1, "hack")); + DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), indexShard.shardId()); + Loader loader = new Loader(reader, 0); + + // Set clean interval to a high value as we will do it manually here. + IndicesRequestCache cache = getIndicesRequestCache( + Settings.builder() + .put(IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_INTERVAL_SETTING_KEY, TimeValue.timeValueMillis(100000)) + .build() + ); + IndicesService.IndexShardCacheEntity cacheEntity = new IndicesService.IndexShardCacheEntity(indexShard); + TermQueryBuilder termQuery = new TermQueryBuilder("id", "bar"); + + // Cache some values for indexShard + BytesReference value = cache.getOrCompute(cacheEntity, loader, reader, getTermBytes()); + + // Verify response and stats. + assertEquals("foo", value.streamInput().readString()); + RequestCacheStats stats = indexShard.requestCache().stats(); + assertEquals("foo", value.streamInput().readString()); + assertEquals(1, cache.count()); + assertEquals(1, stats.getMissCount()); + assertTrue(stats.getMemorySizeInBytes() > 0); + + // Remove the shard making its cache entries stale + IOUtils.close(reader, writer, dir); + indexService.removeShard(0, "force"); + + assertBusy(() -> { assertEquals(IndexShardState.CLOSED, indexShard.state()); }, 1, TimeUnit.SECONDS); + + // Trigger clean up of cache. Should not throw any exception. + cache.cacheCleanupManager.cleanCache(); + // Verify all cleared up. + assertEquals(0, cache.count()); + IOUtils.close(cache); + } + public static String generateString(int length) { String characters = "abcdefghijklmnopqrstuvwxyz"; StringBuilder sb = new StringBuilder(length);