From f051604501de3ffdbe638df594e797382d72d504 Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Fri, 5 Jan 2024 23:19:36 -0800 Subject: [PATCH] Using shardId as cache entity in IndicesRequestCache key Signed-off-by: Sagar Upadhyaya --- .../opensearch/core/index/shard/ShardId.java | 7 + .../indices/IndicesRequestCache.java | 47 ++--- .../opensearch/indices/IndicesService.java | 31 +-- .../indices/IndicesRequestCacheTests.java | 181 +++++++++--------- .../indices/IndicesServiceCloseTests.java | 23 +-- 5 files changed, 129 insertions(+), 160 deletions(-) diff --git a/libs/core/src/main/java/org/opensearch/core/index/shard/ShardId.java b/libs/core/src/main/java/org/opensearch/core/index/shard/ShardId.java index c0abad7ed727f..1e48cf1f476da 100644 --- a/libs/core/src/main/java/org/opensearch/core/index/shard/ShardId.java +++ b/libs/core/src/main/java/org/opensearch/core/index/shard/ShardId.java @@ -32,6 +32,7 @@ package org.opensearch.core.index.shard; +import org.apache.lucene.util.RamUsageEstimator; import org.opensearch.common.annotation.PublicApi; import org.opensearch.core.common.Strings; import org.opensearch.core.common.io.stream.StreamInput; @@ -55,6 +56,8 @@ public class ShardId implements Comparable, ToXContentFragment, Writeab private final int shardId; private final int hashCode; + private final static long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(ShardId.class); + /** * Constructs a new shard id. * @param index the index name @@ -88,6 +91,10 @@ public ShardId(StreamInput in) throws IOException { hashCode = computeHashCode(); } + public long getBaseRamBytesUsed() { + return BASE_RAM_BYTES_USED; + } + /** * Writes this shard id to a stream. * @param out the stream to write to diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 42d7d973d09b8..bba6217c504a9 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -55,6 +55,8 @@ import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.shard.IndexShard; import java.io.Closeable; import java.io.IOException; @@ -65,6 +67,7 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentMap; +import java.util.function.Function; /** * The indices request cache allows to cache a shard level request stage responses, helping with improving @@ -113,9 +116,9 @@ public final class IndicesRequestCache implements RemovalListener cache; - private final IndicesService indicesService; + private final Function cacheEntityFunction; - IndicesRequestCache(Settings settings, IndicesService indicesService) { + IndicesRequestCache(Settings settings, Function cacheEntityFunction) { this.size = INDICES_CACHE_QUERY_SIZE.get(settings); this.expire = INDICES_CACHE_QUERY_EXPIRE.exists(settings) ? INDICES_CACHE_QUERY_EXPIRE.get(settings) : null; long sizeInBytes = size.getBytes(); @@ -127,7 +130,7 @@ public final class IndicesRequestCache implements RemovalListener notification) { - notification.getKey().entity.onRemoval(notification); + cacheEntityFunction.apply(notification.getKey().shardId).onRemoval(notification); } BytesReference getOrCompute( - CacheEntity cacheEntity, + IndicesService.IndexShardCacheEntity cacheEntity, CheckedSupplier loader, DirectoryReader reader, BytesReference cacheKey @@ -158,11 +161,11 @@ BytesReference getOrCompute( .getReaderCacheHelper(); String readerCacheKeyId = delegatingCacheHelper.getDelegatingCacheKey().getId(); assert readerCacheKeyId != null; - final Key key = new Key(cacheEntity, cacheKey, readerCacheKeyId); + final Key key = new Key(((IndexShard) cacheEntity.getCacheIdentity()).shardId(), cacheKey, readerCacheKeyId); Loader cacheLoader = new Loader(cacheEntity, loader); BytesReference value = cache.computeIfAbsent(key, cacheLoader); if (cacheLoader.isLoaded()) { - key.entity.onMiss(); + cacheEntity.onMiss(); // see if its the first time we see this reader, and make sure to register a cleanup key CleanupKey cleanupKey = new CleanupKey(cacheEntity, readerCacheKeyId); if (!registeredClosedListeners.containsKey(cleanupKey)) { @@ -172,7 +175,7 @@ BytesReference getOrCompute( } } } else { - key.entity.onHit(); + cacheEntity.onHit(); } return value; } @@ -183,14 +186,14 @@ BytesReference getOrCompute( * @param reader the reader to invalidate the cache entry for * @param cacheKey the cache key to invalidate */ - void invalidate(CacheEntity cacheEntity, DirectoryReader reader, BytesReference cacheKey) { + void invalidate(IndicesService.IndexShardCacheEntity cacheEntity, DirectoryReader reader, BytesReference cacheKey) { assert reader.getReaderCacheHelper() != null; String readerCacheKeyId = null; if (reader instanceof OpenSearchDirectoryReader) { IndexReader.CacheHelper cacheHelper = ((OpenSearchDirectoryReader) reader).getDelegatingCacheHelper(); readerCacheKeyId = ((OpenSearchDirectoryReader.DelegatingCacheHelper) cacheHelper).getDelegatingCacheKey().getId(); } - cache.invalidate(new Key(cacheEntity, cacheKey, readerCacheKeyId)); + cache.invalidate(new Key(((IndexShard) cacheEntity.getCacheIdentity()).shardId(), cacheKey, readerCacheKeyId)); } /** @@ -225,7 +228,7 @@ public BytesReference load(Key key) throws Exception { /** * Basic interface to make this cache testable. */ - interface CacheEntity extends Accountable, Writeable { + interface CacheEntity extends Accountable { /** * Called after the value was loaded. @@ -266,26 +269,26 @@ interface CacheEntity extends Accountable, Writeable { * * @opensearch.internal */ - class Key implements Accountable, Writeable { - public final CacheEntity entity; // use as identity equality + static class Key implements Accountable, Writeable { + public final ShardId shardId; // use as identity equality public final String readerCacheKeyId; public final BytesReference value; - Key(CacheEntity entity, BytesReference value, String readerCacheKeyId) { - this.entity = entity; + Key(ShardId shardId, BytesReference value, String readerCacheKeyId) { + this.shardId = shardId; this.value = value; this.readerCacheKeyId = Objects.requireNonNull(readerCacheKeyId); } Key(StreamInput in) throws IOException { - this.entity = in.readOptionalWriteable(in1 -> indicesService.new IndexShardCacheEntity(in1)); + this.shardId = in.readOptionalWriteable(ShardId::new); this.readerCacheKeyId = in.readOptionalString(); this.value = in.readBytesReference(); } @Override public long ramBytesUsed() { - return BASE_RAM_BYTES_USED + entity.ramBytesUsed() + value.length(); + return BASE_RAM_BYTES_USED + shardId.getBaseRamBytesUsed() + value.length(); } @Override @@ -300,14 +303,14 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) return false; Key key = (Key) o; if (!Objects.equals(readerCacheKeyId, key.readerCacheKeyId)) return false; - if (!entity.getCacheIdentity().equals(key.entity.getCacheIdentity())) return false; + if (!shardId.equals(key.shardId)) return false; if (!value.equals(key.value)) return false; return true; } @Override public int hashCode() { - int result = entity.getCacheIdentity().hashCode(); + int result = shardId.hashCode(); result = 31 * result + readerCacheKeyId.hashCode(); result = 31 * result + value.hashCode(); return result; @@ -315,7 +318,7 @@ public int hashCode() { @Override public void writeTo(StreamOutput out) throws IOException { - out.writeOptionalWriteable(entity); + out.writeOptionalWriteable(shardId); out.writeOptionalString(readerCacheKeyId); out.writeBytesReference(value); } @@ -376,10 +379,10 @@ synchronized void cleanCache() { if (!currentKeysToClean.isEmpty() || !currentFullClean.isEmpty()) { for (Iterator iterator = cache.keys().iterator(); iterator.hasNext();) { Key key = iterator.next(); - if (currentFullClean.contains(key.entity.getCacheIdentity())) { + if (currentFullClean.contains(cacheEntityFunction.apply(key.shardId).getCacheIdentity())) { iterator.remove(); } else { - if (currentKeysToClean.contains(new CleanupKey(key.entity, key.readerCacheKeyId))) { + if (currentKeysToClean.contains(new CleanupKey(cacheEntityFunction.apply(key.shardId), key.readerCacheKeyId))) { iterator.remove(); } } diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 215a13abccf78..635fe2a07ed2d 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -178,6 +178,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -193,7 +194,6 @@ import java.util.stream.Collectors; import static java.util.Collections.emptyList; -import static java.util.Collections.emptyMap; import static java.util.Collections.unmodifiableMap; import static org.opensearch.common.collect.MapBuilder.newMapBuilder; import static org.opensearch.common.util.concurrent.OpenSearchExecutors.daemonThreadFactory; @@ -301,8 +301,6 @@ public class IndicesService extends AbstractLifecycleComponent Property.Final ); - private static long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(IndexShardCacheEntity.class); - /** * If enabled, this setting enforces that indexes will be created with a replication type matching the cluster setting * defined in cluster.indices.replication.strategy by rejecting any request that specifies a replication type that @@ -335,7 +333,7 @@ public class IndicesService extends AbstractLifecycleComponent private final ScriptService scriptService; private final ClusterService clusterService; private final Client client; - private volatile Map indices = emptyMap(); + private volatile Map indices = new ConcurrentHashMap<>(); private final Map> pendingDeletes = new HashMap<>(); private final AtomicInteger numUncompletedDeletes = new AtomicInteger(); private final OldShardsStats oldShardsStats = new OldShardsStats(); @@ -411,7 +409,10 @@ public IndicesService( this.shardsClosedTimeout = settings.getAsTime(INDICES_SHARDS_CLOSED_TIMEOUT, new TimeValue(1, TimeUnit.DAYS)); this.analysisRegistry = analysisRegistry; this.indexNameExpressionResolver = indexNameExpressionResolver; - this.indicesRequestCache = new IndicesRequestCache(settings, this); + this.indicesRequestCache = new IndicesRequestCache(settings, (shardId -> { + IndexService indexService = indexServiceSafe(shardId.getIndex()); + return new IndexShardCacheEntity(indexService.getShard(shardId.id())); + })); this.indicesQueryCache = new IndicesQueryCache(settings); this.mapperRegistry = mapperRegistry; this.namedWriteableRegistry = namedWriteableRegistry; @@ -1746,7 +1747,6 @@ private BytesReference cacheShardLevelResult( BytesReference cacheKey, CheckedConsumer loader ) throws Exception { - IndexShardCacheEntity cacheEntity = new IndexShardCacheEntity(shard); CheckedSupplier supplier = () -> { /* BytesStreamOutput allows to pass the expected size but by default uses * BigArrays.PAGE_SIZE_IN_BYTES which is 16k. A common cached result ie. @@ -1763,7 +1763,7 @@ private BytesReference cacheShardLevelResult( return out.bytes(); } }; - return indicesRequestCache.getOrCompute(cacheEntity, supplier, reader, cacheKey); + return indicesRequestCache.getOrCompute(new IndexShardCacheEntity(shard), supplier, reader, cacheKey); } /** @@ -1771,20 +1771,15 @@ private BytesReference cacheShardLevelResult( * * @opensearch.internal */ - public final class IndexShardCacheEntity extends AbstractIndexShardCacheEntity { + public static class IndexShardCacheEntity extends AbstractIndexShardCacheEntity { + + private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(IndexShardCacheEntity.class); private final IndexShard indexShard; public IndexShardCacheEntity(IndexShard indexShard) { this.indexShard = indexShard; } - public IndexShardCacheEntity(StreamInput in) throws IOException { - Index index = in.readOptionalWriteable(Index::new); - int shardId = in.readVInt(); - IndexService indexService = indices.get(index.getUUID()); - this.indexShard = Optional.ofNullable(indexService).map(indexService1 -> indexService1.getShard(shardId)).orElse(null); - } - @Override protected ShardRequestCache stats() { return indexShard.requestCache(); @@ -1806,12 +1801,6 @@ public long ramBytesUsed() { // across many entities return BASE_RAM_BYTES_USED; } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeOptionalWriteable(indexShard.shardId().getIndex()); - out.writeVInt(indexShard.shardId().id()); - } } @FunctionalInterface diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index 18ec013711f22..3ae9e55c8ad7a 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -53,27 +53,33 @@ import org.opensearch.core.common.bytes.AbstractBytesReference; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.common.io.stream.StreamInput; -import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.core.xcontent.XContentHelper; import org.opensearch.index.IndexService; +import org.opensearch.index.cache.request.RequestCacheStats; import org.opensearch.index.cache.request.ShardRequestCache; import org.opensearch.index.query.TermQueryBuilder; import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardState; import org.opensearch.test.OpenSearchSingleNodeTestCase; import java.io.IOException; import java.util.Arrays; import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class IndicesRequestCacheTests extends OpenSearchSingleNodeTestCase { public void testBasicOperationsCache() throws Exception { - ShardRequestCache requestCacheStats = new ShardRequestCache(); - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, getInstanceFromNode(IndicesService.class)); + IndexShard indexShard = createIndex("test").getShard(0); + IndicesRequestCache cache = new IndicesRequestCache( + Settings.EMPTY, + (shardId -> new IndicesService.IndexShardCacheEntity(indexShard)) + ); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -81,13 +87,13 @@ public void testBasicOperationsCache() throws Exception { DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - AtomicBoolean indexShard = new AtomicBoolean(true); // initial cache - TestEntity entity = new TestEntity(requestCacheStats, indexShard); + IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); Loader loader = new Loader(reader, 0); BytesReference value = cache.getOrCompute(entity, loader, reader, termBytes); assertEquals("foo", value.streamInput().readString()); + ShardRequestCache requestCacheStats = indexShard.requestCache(); assertEquals(0, requestCacheStats.stats().getHitCount()); assertEquals(1, requestCacheStats.stats().getMissCount()); assertEquals(0, requestCacheStats.stats().getEvictions()); @@ -95,10 +101,11 @@ public void testBasicOperationsCache() throws Exception { assertEquals(1, cache.count()); // cache hit - entity = new TestEntity(requestCacheStats, indexShard); + entity = new IndicesService.IndexShardCacheEntity(indexShard); loader = new Loader(reader, 0); value = cache.getOrCompute(entity, loader, reader, termBytes); assertEquals("foo", value.streamInput().readString()); + requestCacheStats = indexShard.requestCache(); assertEquals(1, requestCacheStats.stats().getHitCount()); assertEquals(1, requestCacheStats.stats().getMissCount()); assertEquals(0, requestCacheStats.stats().getEvictions()); @@ -111,7 +118,7 @@ public void testBasicOperationsCache() throws Exception { if (randomBoolean()) { reader.close(); } else { - indexShard.set(false); // closed shard but reader is still open + indexShard.close("test", true, true); // closed shard but reader is still open cache.clear(entity); } cache.cleanCache(); @@ -127,9 +134,12 @@ public void testBasicOperationsCache() throws Exception { } public void testCacheDifferentReaders() throws Exception { - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, getInstanceFromNode(IndicesService.class)); - AtomicBoolean indexShard = new AtomicBoolean(true); - ShardRequestCache requestCacheStats = new ShardRequestCache(); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexShard indexShard = createIndex("test").getShard(0); + IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, (shardId -> { + IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); + return new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id())); + })); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -146,9 +156,10 @@ public void testCacheDifferentReaders() throws Exception { DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); // initial cache - TestEntity entity = new TestEntity(requestCacheStats, indexShard); + IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); Loader loader = new Loader(reader, 0); BytesReference value = cache.getOrCompute(entity, loader, reader, termBytes); + ShardRequestCache requestCacheStats = entity.stats(); assertEquals("foo", value.streamInput().readString()); assertEquals(0, requestCacheStats.stats().getHitCount()); assertEquals(1, requestCacheStats.stats().getMissCount()); @@ -160,9 +171,10 @@ public void testCacheDifferentReaders() throws Exception { assertEquals(1, cache.numRegisteredCloseListeners()); // cache the second - TestEntity secondEntity = new TestEntity(requestCacheStats, indexShard); + IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); loader = new Loader(secondReader, 0); value = cache.getOrCompute(entity, loader, secondReader, termBytes); + requestCacheStats = entity.stats(); assertEquals("bar", value.streamInput().readString()); assertEquals(0, requestCacheStats.stats().getHitCount()); assertEquals(2, requestCacheStats.stats().getMissCount()); @@ -172,9 +184,10 @@ public void testCacheDifferentReaders() throws Exception { assertTrue(requestCacheStats.stats().getMemorySize().bytesAsInt() > cacheSize + value.length()); assertEquals(2, cache.numRegisteredCloseListeners()); - secondEntity = new TestEntity(requestCacheStats, indexShard); + secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); loader = new Loader(secondReader, 0); value = cache.getOrCompute(secondEntity, loader, secondReader, termBytes); + requestCacheStats = entity.stats(); assertEquals("bar", value.streamInput().readString()); assertEquals(1, requestCacheStats.stats().getHitCount()); assertEquals(2, requestCacheStats.stats().getMissCount()); @@ -182,10 +195,11 @@ public void testCacheDifferentReaders() throws Exception { assertTrue(loader.loadedFromCache); assertEquals(2, cache.count()); - entity = new TestEntity(requestCacheStats, indexShard); + entity = new IndicesService.IndexShardCacheEntity(indexShard); loader = new Loader(reader, 0); value = cache.getOrCompute(entity, loader, reader, termBytes); assertEquals("foo", value.streamInput().readString()); + requestCacheStats = entity.stats(); assertEquals(2, requestCacheStats.stats().getHitCount()); assertEquals(2, requestCacheStats.stats().getMissCount()); assertEquals(0, requestCacheStats.stats().getEvictions()); @@ -206,7 +220,7 @@ public void testCacheDifferentReaders() throws Exception { if (randomBoolean()) { secondReader.close(); } else { - indexShard.set(false); // closed shard but reader is still open + indexShard.close("test", true, true); // closed shard but reader is still open cache.clear(secondEntity); } cache.cleanCache(); @@ -223,9 +237,11 @@ public void testCacheDifferentReaders() throws Exception { public void testEviction() throws Exception { final ByteSizeValue size; { - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, getInstanceFromNode(IndicesService.class)); - AtomicBoolean indexShard = new AtomicBoolean(true); - ShardRequestCache requestCacheStats = new ShardRequestCache(); + IndexShard indexShard = createIndex("test").getShard(0); + IndicesRequestCache cache = new IndicesRequestCache( + Settings.EMPTY, + (shardId -> new IndicesService.IndexShardCacheEntity(indexShard)) + ); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -233,27 +249,26 @@ public void testEviction() throws Exception { DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - TestEntity entity = new TestEntity(requestCacheStats, indexShard); + IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); Loader loader = new Loader(reader, 0); writer.updateDocument(new Term("id", "0"), newDoc(0, "bar")); DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TestEntity secondEntity = new TestEntity(requestCacheStats, indexShard); + IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); Loader secondLoader = new Loader(secondReader, 0); BytesReference value1 = cache.getOrCompute(entity, loader, reader, termBytes); assertEquals("foo", value1.streamInput().readString()); BytesReference value2 = cache.getOrCompute(secondEntity, secondLoader, secondReader, termBytes); assertEquals("bar", value2.streamInput().readString()); - size = requestCacheStats.stats().getMemorySize(); + size = indexShard.requestCache().stats().getMemorySize(); IOUtils.close(reader, secondReader, writer, dir, cache); } + IndexShard indexShard = createIndex("test1").getShard(0); IndicesRequestCache cache = new IndicesRequestCache( Settings.builder().put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), size.getBytes() + 1 + "b").build(), - null + (shardId -> new IndicesService.IndexShardCacheEntity(indexShard)) ); - AtomicBoolean indexShard = new AtomicBoolean(true); - ShardRequestCache requestCacheStats = new ShardRequestCache(); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -261,36 +276,39 @@ public void testEviction() throws Exception { DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - TestEntity entity = new TestEntity(requestCacheStats, indexShard); + IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); Loader loader = new Loader(reader, 0); writer.updateDocument(new Term("id", "0"), newDoc(0, "bar")); DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TestEntity secondEntity = new TestEntity(requestCacheStats, indexShard); + IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); Loader secondLoader = new Loader(secondReader, 0); writer.updateDocument(new Term("id", "0"), newDoc(0, "baz")); DirectoryReader thirdReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TestEntity thirddEntity = new TestEntity(requestCacheStats, indexShard); + IndicesService.IndexShardCacheEntity thirddEntity = new IndicesService.IndexShardCacheEntity(indexShard); Loader thirdLoader = new Loader(thirdReader, 0); BytesReference value1 = cache.getOrCompute(entity, loader, reader, termBytes); assertEquals("foo", value1.streamInput().readString()); BytesReference value2 = cache.getOrCompute(secondEntity, secondLoader, secondReader, termBytes); assertEquals("bar", value2.streamInput().readString()); - logger.info("Memory size: {}", requestCacheStats.stats().getMemorySize()); + logger.info("Memory size: {}", indexShard.requestCache().stats().getMemorySize()); BytesReference value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, termBytes); assertEquals("baz", value3.streamInput().readString()); assertEquals(2, cache.count()); - assertEquals(1, requestCacheStats.stats().getEvictions()); + assertEquals(1, indexShard.requestCache().stats().getEvictions()); IOUtils.close(reader, secondReader, thirdReader, writer, dir, cache); } public void testClearAllEntityIdentity() throws Exception { - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, getInstanceFromNode(IndicesService.class)); - AtomicBoolean indexShard = new AtomicBoolean(true); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexShard indexShard = createIndex("test").getShard(0); + IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, (shardId -> { + IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); + return new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id())); + })); - ShardRequestCache requestCacheStats = new ShardRequestCache(); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -298,36 +316,39 @@ public void testClearAllEntityIdentity() throws Exception { DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - TestEntity entity = new TestEntity(requestCacheStats, indexShard); + IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); Loader loader = new Loader(reader, 0); writer.updateDocument(new Term("id", "0"), newDoc(0, "bar")); DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TestEntity secondEntity = new TestEntity(requestCacheStats, indexShard); + IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); Loader secondLoader = new Loader(secondReader, 0); writer.updateDocument(new Term("id", "0"), newDoc(0, "baz")); DirectoryReader thirdReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - AtomicBoolean differentIdentity = new AtomicBoolean(true); - TestEntity thirddEntity = new TestEntity(requestCacheStats, differentIdentity); + IndicesService.IndexShardCacheEntity thirddEntity = new IndicesService.IndexShardCacheEntity(createIndex("test1").getShard(0)); Loader thirdLoader = new Loader(thirdReader, 0); BytesReference value1 = cache.getOrCompute(entity, loader, reader, termBytes); assertEquals("foo", value1.streamInput().readString()); BytesReference value2 = cache.getOrCompute(secondEntity, secondLoader, secondReader, termBytes); assertEquals("bar", value2.streamInput().readString()); - logger.info("Memory size: {}", requestCacheStats.stats().getMemorySize()); + logger.info("Memory size: {}", indexShard.requestCache().stats().getMemorySize()); BytesReference value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, termBytes); assertEquals("baz", value3.streamInput().readString()); assertEquals(3, cache.count()); - final long hitCount = requestCacheStats.stats().getHitCount(); + RequestCacheStats requestCacheStats = entity.stats().stats(); + requestCacheStats.add(thirddEntity.stats().stats()); + final long hitCount = requestCacheStats.getHitCount(); // clear all for the indexShard Idendity even though is't still open cache.clear(randomFrom(entity, secondEntity)); cache.cleanCache(); assertEquals(1, cache.count()); // third has not been validated since it's a different identity value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, termBytes); - assertEquals(hitCount + 1, requestCacheStats.stats().getHitCount()); + requestCacheStats = entity.stats().stats(); + requestCacheStats.add(thirddEntity.stats().stats()); + assertEquals(hitCount + 1, requestCacheStats.getHitCount()); assertEquals("baz", value3.streamInput().readString()); IOUtils.close(reader, secondReader, thirdReader, writer, dir, cache); @@ -371,8 +392,12 @@ public BytesReference get() { } public void testInvalidate() throws Exception { - ShardRequestCache requestCacheStats = new ShardRequestCache(); - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, getInstanceFromNode(IndicesService.class)); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexShard indexShard = createIndex("test").getShard(0); + IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, (shardId -> { + IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); + return new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id())); + })); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -380,13 +405,13 @@ public void testInvalidate() throws Exception { DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - AtomicBoolean indexShard = new AtomicBoolean(true); // initial cache - TestEntity entity = new TestEntity(requestCacheStats, indexShard); + IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); Loader loader = new Loader(reader, 0); BytesReference value = cache.getOrCompute(entity, loader, reader, termBytes); assertEquals("foo", value.streamInput().readString()); + ShardRequestCache requestCacheStats = entity.stats(); assertEquals(0, requestCacheStats.stats().getHitCount()); assertEquals(1, requestCacheStats.stats().getMissCount()); assertEquals(0, requestCacheStats.stats().getEvictions()); @@ -394,10 +419,11 @@ public void testInvalidate() throws Exception { assertEquals(1, cache.count()); // cache hit - entity = new TestEntity(requestCacheStats, indexShard); + entity = new IndicesService.IndexShardCacheEntity(indexShard); loader = new Loader(reader, 0); value = cache.getOrCompute(entity, loader, reader, termBytes); assertEquals("foo", value.streamInput().readString()); + requestCacheStats = entity.stats(); assertEquals(1, requestCacheStats.stats().getHitCount()); assertEquals(1, requestCacheStats.stats().getMissCount()); assertEquals(0, requestCacheStats.stats().getEvictions()); @@ -407,11 +433,12 @@ public void testInvalidate() throws Exception { assertEquals(1, cache.numRegisteredCloseListeners()); // load again after invalidate - entity = new TestEntity(requestCacheStats, indexShard); + entity = new IndicesService.IndexShardCacheEntity(indexShard); loader = new Loader(reader, 0); cache.invalidate(entity, reader, termBytes); value = cache.getOrCompute(entity, loader, reader, termBytes); assertEquals("foo", value.streamInput().readString()); + requestCacheStats = entity.stats(); assertEquals(1, requestCacheStats.stats().getHitCount()); assertEquals(2, requestCacheStats.stats().getMissCount()); assertEquals(0, requestCacheStats.stats().getEvictions()); @@ -424,7 +451,7 @@ public void testInvalidate() throws Exception { if (randomBoolean()) { reader.close(); } else { - indexShard.set(false); // closed shard but reader is still open + indexShard.close("test", true, true); // closed shard but reader is still open cache.clear(entity); } cache.cleanCache(); @@ -439,25 +466,25 @@ public void testInvalidate() throws Exception { } public void testEqualsKey() throws IOException { - AtomicBoolean trueBoolean = new AtomicBoolean(true); - AtomicBoolean falseBoolean = new AtomicBoolean(false); IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndicesRequestCache indicesRequestCache = indicesService.indicesRequestCache; Directory dir = newDirectory(); IndexWriterConfig config = newIndexWriterConfig(); IndexWriter writer = new IndexWriter(dir, config); ShardId shardId = new ShardId("foo", "bar", 1); + ShardId shardId1 = new ShardId("foo1", "bar1", 2); IndexReader reader1 = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), shardId); String rKey1 = ((OpenSearchDirectoryReader) reader1).getDelegatingCacheHelper().getDelegatingCacheKey().getId(); writer.addDocument(new Document()); IndexReader reader2 = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), shardId); String rKey2 = ((OpenSearchDirectoryReader) reader2).getDelegatingCacheHelper().getDelegatingCacheKey().getId(); IOUtils.close(reader1, reader2, writer, dir); - IndicesRequestCache.Key key1 = indicesRequestCache.new Key(new TestEntity(null, trueBoolean), new TestBytesReference(1), rKey1); - IndicesRequestCache.Key key2 = indicesRequestCache.new Key(new TestEntity(null, trueBoolean), new TestBytesReference(1), rKey1); - IndicesRequestCache.Key key3 = indicesRequestCache.new Key(new TestEntity(null, falseBoolean), new TestBytesReference(1), rKey1); - IndicesRequestCache.Key key4 = indicesRequestCache.new Key(new TestEntity(null, trueBoolean), new TestBytesReference(1), rKey2); - IndicesRequestCache.Key key5 = indicesRequestCache.new Key(new TestEntity(null, trueBoolean), new TestBytesReference(2), rKey2); + IndexShard indexShard = mock(IndexShard.class); + when(indexShard.state()).thenReturn(IndexShardState.STARTED); + IndicesRequestCache.Key key1 = new IndicesRequestCache.Key(shardId, new TestBytesReference(1), rKey1); + IndicesRequestCache.Key key2 = new IndicesRequestCache.Key(shardId, new TestBytesReference(1), rKey1); + IndicesRequestCache.Key key3 = new IndicesRequestCache.Key(shardId1, new TestBytesReference(1), rKey1); + IndicesRequestCache.Key key4 = new IndicesRequestCache.Key(shardId, new TestBytesReference(1), rKey2); + IndicesRequestCache.Key key5 = new IndicesRequestCache.Key(shardId, new TestBytesReference(2), rKey2); String s = "Some other random object"; assertEquals(key1, key1); assertEquals(key1, key2); @@ -471,14 +498,11 @@ public void testEqualsKey() throws IOException { public void testSerializationDeserializationOfCacheKey() throws Exception { TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - ShardRequestCache shardRequestCache = new ShardRequestCache(); - IndicesService indicesService = getInstanceFromNode(IndicesService.class); - IndicesRequestCache indicesRequestCache = indicesService.indicesRequestCache; IndexService indexService = createIndex("test"); IndexShard indexShard = indexService.getShard(0); - IndicesService.IndexShardCacheEntity shardCacheEntity = indicesService.new IndexShardCacheEntity(indexShard); + IndicesService.IndexShardCacheEntity shardCacheEntity = new IndicesService.IndexShardCacheEntity(indexShard); String readerCacheKeyId = UUID.randomUUID().toString(); - IndicesRequestCache.Key key1 = indicesRequestCache.new Key(shardCacheEntity, termBytes, readerCacheKeyId); + IndicesRequestCache.Key key1 = new IndicesRequestCache.Key(indexShard.shardId(), termBytes, readerCacheKeyId); BytesReference bytesReference = null; try (BytesStreamOutput out = new BytesStreamOutput()) { key1.writeTo(out); @@ -486,10 +510,10 @@ public void testSerializationDeserializationOfCacheKey() throws Exception { } StreamInput in = bytesReference.streamInput(); - IndicesRequestCache.Key key2 = indicesRequestCache.new Key(in); + IndicesRequestCache.Key key2 = new IndicesRequestCache.Key(in); assertEquals(readerCacheKeyId, key2.readerCacheKeyId); - assertEquals(shardCacheEntity.getCacheIdentity(), key2.entity.getCacheIdentity()); + assertEquals(((IndexShard) shardCacheEntity.getCacheIdentity()).shardId(), key2.shardId); assertEquals(termBytes, key2.value); } @@ -544,37 +568,4 @@ public boolean isFragment() { return false; } } - - private class TestEntity extends AbstractIndexShardCacheEntity { - private final AtomicBoolean standInForIndexShard; - private final ShardRequestCache shardRequestCache; - - private TestEntity(ShardRequestCache shardRequestCache, AtomicBoolean standInForIndexShard) { - this.standInForIndexShard = standInForIndexShard; - this.shardRequestCache = shardRequestCache; - } - - @Override - protected ShardRequestCache stats() { - return shardRequestCache; - } - - @Override - public boolean isOpen() { - return standInForIndexShard.get(); - } - - @Override - public Object getCacheIdentity() { - return standInForIndexShard; - } - - @Override - public long ramBytesUsed() { - return 42; - } - - @Override - public void writeTo(StreamOutput out) throws IOException {} - } } diff --git a/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java b/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java index 1290daadf8e6d..86a4e78f18af5 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java @@ -44,19 +44,15 @@ import org.apache.lucene.search.Weight; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.routing.allocation.DiskThresholdSettings; -import org.opensearch.common.cache.RemovalNotification; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.core.common.bytes.BytesArray; -import org.opensearch.core.common.bytes.BytesReference; -import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; import org.opensearch.index.IndexModule; import org.opensearch.index.IndexService; import org.opensearch.index.engine.Engine; import org.opensearch.index.shard.IndexShard; -import org.opensearch.indices.IndicesRequestCache.Key; import org.opensearch.indices.breaker.HierarchyCircuitBreakerService; import org.opensearch.node.MockNode; import org.opensearch.node.Node; @@ -374,20 +370,12 @@ public void testCloseWhileOngoingRequestUsesRequestCache() throws Exception { assertEquals(1, indicesService.indicesRefCount.refCount()); assertEquals(0L, cache.count()); - IndicesRequestCache.CacheEntity cacheEntity = new IndicesRequestCache.CacheEntity() { - @Override - public void writeTo(StreamOutput out) throws IOException { - - } - + IndicesService.IndexShardCacheEntity cacheEntity = new IndicesService.IndexShardCacheEntity(shard) { @Override public long ramBytesUsed() { return 42; } - @Override - public void onCached(Key key, BytesReference value) {} - @Override public boolean isOpen() { return true; @@ -397,15 +385,6 @@ public boolean isOpen() { public Object getCacheIdentity() { return this; } - - @Override - public void onHit() {} - - @Override - public void onMiss() {} - - @Override - public void onRemoval(RemovalNotification notification) {} }; cache.getOrCompute(cacheEntity, () -> new BytesArray("bar"), searcher.getDirectoryReader(), new BytesArray("foo")); assertEquals(1L, cache.count());