Skip to content

Commit

Permalink
Fix ShardNotFoundException during request cache clean up (opensearch-…
Browse files Browse the repository at this point in the history
…project#14219) (opensearch-project#14232)

* Fix for ShardNotFoundException during request cache clean up

* Added changelog

* Fix forbidden gradle check

---------

(cherry picked from commit 1593878)

Signed-off-by: Sagar Upadhyaya <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Signed-off-by: kkewwei <[email protected]>
  • Loading branch information
2 people authored and kkewwei committed Jul 24, 2024
1 parent c001187 commit 4cecb70
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix double invocation of postCollection when MultiBucketCollector is present ([#14015](https://github.com/opensearch-project/OpenSearch/pull/14015))
- Fix ReplicaShardBatchAllocator to batch shards without duplicates ([#13710](https://github.com/opensearch-project/OpenSearch/pull/13710))
- Java high-level REST client bulk() is not respecting the bulkRequest.requireAlias(true) method call ([#14146](https://github.com/opensearch-project/OpenSearch/pull/14146))
- Fix ShardNotFoundException during request cache clean up ([#14219](https://github.com/opensearch-project/OpenSearch/pull/14219))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ public final class IndicesRequestCache implements RemovalListener<ICacheKey<Indi
private final TimeValue expire;
private final ICache<Key, BytesReference> cache;
private final ClusterService clusterService;
private final Function<ShardId, Optional<CacheEntity>> cacheEntityLookup;
// pkg-private for testing
final Function<ShardId, Optional<CacheEntity>> cacheEntityLookup;
// pkg-private for testing
final IndicesRequestCacheCleanupManager cacheCleanupManager;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ public IndicesService(
if (indexService == null) {
return Optional.empty();
}
return Optional.of(new IndexShardCacheEntity(indexService.getShard(shardId.id())));
return Optional.of(new IndexShardCacheEntity(indexService.getShardOrNull(shardId.id())));
}), cacheService, threadPool, clusterService);
this.indicesQueryCache = new IndicesQueryCache(settings);
this.mapperRegistry = mapperRegistry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static java.util.Collections.emptyMap;
Expand Down Expand Up @@ -798,15 +799,9 @@ private DirectoryReader getReader(IndexWriter writer, ShardId shardId) throws IO

private IndicesRequestCache getIndicesRequestCache(Settings settings) {
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
return new IndicesRequestCache(settings, (shardId -> {
IndexService indexService = null;
try {
indexService = indicesService.indexServiceSafe(shardId.getIndex());
} catch (IndexNotFoundException ex) {
return Optional.empty();
}
return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id())));
}),
return new IndicesRequestCache(
settings,
indicesService.indicesRequestCache.cacheEntityLookup,
new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(),
threadPool,
ClusterServiceUtils.createClusterService(threadPool)
Expand Down Expand Up @@ -1419,6 +1414,55 @@ public void testDeleteAndCreateIndexShardOnSameNodeAndVerifyStats() throws Excep
IOUtils.close(reader, writer, dir, cache);
}

public void testIndexShardClosedAndVerifyCacheCleanUpWorksSuccessfully() throws Exception {
threadPool = getThreadPool();
String indexName = "test1";
// Create a shard
IndexService indexService = createIndex(
indexName,
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()
);
IndexShard indexShard = indexService.getShard(0);
Directory dir = newDirectory();
IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig());
writer.addDocument(newDoc(0, "foo"));
writer.addDocument(newDoc(1, "hack"));
DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), indexShard.shardId());
Loader loader = new Loader(reader, 0);

// Set clean interval to a high value as we will do it manually here.
IndicesRequestCache cache = getIndicesRequestCache(
Settings.builder()
.put(IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_INTERVAL_SETTING_KEY, TimeValue.timeValueMillis(100000))
.build()
);
IndicesService.IndexShardCacheEntity cacheEntity = new IndicesService.IndexShardCacheEntity(indexShard);
TermQueryBuilder termQuery = new TermQueryBuilder("id", "bar");

// Cache some values for indexShard
BytesReference value = cache.getOrCompute(cacheEntity, loader, reader, getTermBytes());

// Verify response and stats.
assertEquals("foo", value.streamInput().readString());
RequestCacheStats stats = indexShard.requestCache().stats();
assertEquals("foo", value.streamInput().readString());
assertEquals(1, cache.count());
assertEquals(1, stats.getMissCount());
assertTrue(stats.getMemorySizeInBytes() > 0);

// Remove the shard making its cache entries stale
IOUtils.close(reader, writer, dir);
indexService.removeShard(0, "force");

assertBusy(() -> { assertEquals(IndexShardState.CLOSED, indexShard.state()); }, 1, TimeUnit.SECONDS);

// Trigger clean up of cache. Should not throw any exception.
cache.cacheCleanupManager.cleanCache();
// Verify all cleared up.
assertEquals(0, cache.count());
IOUtils.close(cache);
}

public static String generateString(int length) {
String characters = "abcdefghijklmnopqrstuvwxyz";
StringBuilder sb = new StringBuilder(length);
Expand Down

0 comments on commit 4cecb70

Please sign in to comment.