From e76c9b64fdfd7c9ab02910fb2283bef0d8c1f3be Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Fri, 14 Jun 2024 15:44:51 -0700 Subject: [PATCH 1/6] Update IndicesRequestCacheIT.java Signed-off-by: Kiran Prakash --- .../indices/IndicesRequestCacheIT.java | 71 +++++++++++++------ 1 file changed, 51 insertions(+), 20 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java index 299652e4f07a9..7cac16936528f 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java @@ -56,6 +56,9 @@ import org.opensearch.core.index.shard.ShardId; import org.opensearch.env.NodeEnvironment; import org.opensearch.index.IndexNotFoundException; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.MergeSchedulerConfig; +import org.opensearch.index.TieredMergePolicyProvider; import org.opensearch.index.cache.request.RequestCacheStats; import org.opensearch.index.query.QueryBuilders; import org.opensearch.search.aggregations.bucket.global.GlobalAggregationBuilder; @@ -126,6 +129,8 @@ public void testCacheAggs() throws Exception { .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) .put(SETTING_NUMBER_OF_SHARDS, 1) .put(SETTING_NUMBER_OF_REPLICAS, 0) + // Disable index refreshing to avoid cache being invalidated mid-test + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(-1)) ) .get() ); @@ -134,7 +139,8 @@ public void testCacheAggs() throws Exception { client.prepareIndex(index).setSource("f", "2014-03-10T00:00:00.000Z"), client.prepareIndex(index).setSource("f", "2014-05-13T00:00:00.000Z") ); - ensureSearchable(index); + // Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache + forceMerge(client, index); // This is not a random example: serialization with time zones writes shared strings // which used to not work well with the query cache because of the handles stream output @@ -197,6 +203,8 @@ public void testQueryRewrite() throws Exception { .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 5) .put("index.number_of_routing_shards", 5) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + // Disable index refreshing to avoid cache being invalidated mid-test + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(-1)) ) .get() ); @@ -216,10 +224,7 @@ public void testQueryRewrite() throws Exception { assertCacheState(client, index, 0, 0); // Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache - ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge(index).setFlush(true).get(); - OpenSearchAssertions.assertAllSuccessful(forceMergeResponse); - refreshAndWaitForReplication(); - ensureSearchable(index); + forceMerge(client, index); assertCacheState(client, index, 0, 0); @@ -268,6 +273,8 @@ public void testQueryRewriteMissingValues() throws Exception { .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + // Disable index refreshing to avoid cache being invalidated mid-test + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(-1)) ) .get() ); @@ -287,10 +294,7 @@ public void testQueryRewriteMissingValues() throws Exception { assertCacheState(client, index, 0, 0); // Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache - ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge(index).setFlush(true).get(); - OpenSearchAssertions.assertAllSuccessful(forceMergeResponse); - refreshAndWaitForReplication(); - ensureSearchable(index); + forceMerge(client, index); assertCacheState(client, index, 0, 0); @@ -335,6 +339,8 @@ public void testQueryRewriteDates() throws Exception { .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + // Disable index refreshing to avoid cache being invalidated mid-test + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(-1)) ) .get() ); @@ -354,10 +360,7 @@ public void testQueryRewriteDates() throws Exception { assertCacheState(client, index, 0, 0); // Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache - ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge(index).setFlush(true).get(); - OpenSearchAssertions.assertAllSuccessful(forceMergeResponse); - refreshAndWaitForReplication(); - ensureSearchable(index); + forceMerge(client, index); assertCacheState(client, index, 0, 0); @@ -399,6 +402,8 @@ public void testQueryRewriteDatesWithNow() throws Exception { .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + // Disable index refreshing to avoid cache being invalidated mid-test + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(-1)) .build(); assertAcked(client.admin().indices().prepareCreate("index-1").setMapping("d", "type=date").setSettings(settings).get()); assertAcked(client.admin().indices().prepareCreate("index-2").setMapping("d", "type=date").setSettings(settings).get()); @@ -480,6 +485,7 @@ public void testCanCache() throws Exception { .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2) .put("index.number_of_routing_shards", 2) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(-1)) .build(); String index = "index"; assertAcked(client.admin().indices().prepareCreate(index).setMapping("s", "type=date").setSettings(settings).get()); @@ -499,10 +505,7 @@ public void testCanCache() throws Exception { assertCacheState(client, index, 0, 0); // Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache - ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge(index).setFlush(true).get(); - OpenSearchAssertions.assertAllSuccessful(forceMergeResponse); - refreshAndWaitForReplication(); - ensureSearchable(index); + forceMerge(client, index); assertCacheState(client, index, 0, 0); @@ -644,11 +647,14 @@ public void testProfileDisableCache() throws Exception { .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + // Disable index refreshing to avoid cache being invalidated mid-test + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(-1)) ) .get() ); indexRandom(true, client.prepareIndex(index).setSource("k", "hello")); - ensureSearchable(index); + // Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache + forceMerge(client, index); int expectedHits = 0; int expectedMisses = 0; @@ -687,11 +693,14 @@ public void testCacheWithInvalidation() throws Exception { .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) .put("index.refresh_interval", -1) + // Disable index refreshing to avoid cache being invalidated mid-test + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(-1)) ) .get() ); indexRandom(true, client.prepareIndex(index).setSource("k", "hello")); - ensureSearchable(index); + // Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache + forceMerge(client, index); SearchResponse resp = client.prepareSearch(index).setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get(); assertSearchResponse(resp); OpenSearchAssertions.assertAllSuccessful(resp); @@ -1238,7 +1247,12 @@ public void testDeleteAndCreateSameIndexShardOnSameNode() throws Exception { logger.info("Creating an index: {} with 2 shards", indexName); createIndex( indexName, - Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build() + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + // Disable index refreshing to avoid cache being invalidated mid-test + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(-1)) + .build() ); ensureGreen(indexName); @@ -1246,6 +1260,8 @@ public void testDeleteAndCreateSameIndexShardOnSameNode() throws Exception { logger.info("Writing few docs and searching those which will cache items in RequestCache"); indexRandom(true, client.prepareIndex(indexName).setSource("k", "hello")); indexRandom(true, client.prepareIndex(indexName).setSource("y", "hello again")); + // Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache + forceMerge(client, indexName); SearchResponse resp = client.prepareSearch(indexName).setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get(); assertSearchResponse(resp); resp = client.prepareSearch(indexName).setRequestCache(true).setQuery(QueryBuilders.termQuery("y", "hello")).get(); @@ -1329,11 +1345,26 @@ private void setupIndex(Client client, String index) throws Exception { .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + // Disable index refreshing to avoid cache being invalidated mid-test + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(-1)) + // settings to reduce the chances of background segment merges invalidating the cache + .put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), 1000d) + .put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), 2) + .put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGED_SEGMENT_SETTING.getKey(), "100gb") + .put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), 1) + .put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), 1) ) .get() ); + indexRandom(true, client.prepareIndex(index).setSource("k", "hello")); indexRandom(true, client.prepareIndex(index).setSource("k", "there")); + } + + private void forceMerge(Client client, String index) { + ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge(index).setFlush(true).get(); + OpenSearchAssertions.assertAllSuccessful(forceMergeResponse); + refreshAndWaitForReplication(); ensureSearchable(index); } From fdec19c0b23b86564efb834e0c43974180ea0104 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Fri, 14 Jun 2024 17:11:12 -0700 Subject: [PATCH 2/6] Update IndicesRequestCacheIT.java Signed-off-by: Kiran Prakash --- .../java/org/opensearch/indices/IndicesRequestCacheIT.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java index 7cac16936528f..e120b289ded4d 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java @@ -725,6 +725,7 @@ public void testCacheWithInvalidation() throws Exception { // calling cache clear api, when staleness threshold is lower than staleness, it should clean the stale keys from cache public void testCacheClearAPIRemovesStaleKeysWhenStalenessThresholdIsLow() throws Exception { + String node = internalCluster().startNode( Settings.builder() .put(IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_STALENESS_THRESHOLD_SETTING_KEY, 0.10) From 43940a8d8e7da186729fb63d5ccb212d6d1d3a23 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Fri, 14 Jun 2024 17:11:32 -0700 Subject: [PATCH 3/6] Update IndicesRequestCacheIT.java Signed-off-by: Kiran Prakash --- .../java/org/opensearch/indices/IndicesRequestCacheIT.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java index e120b289ded4d..7cac16936528f 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java @@ -725,7 +725,6 @@ public void testCacheWithInvalidation() throws Exception { // calling cache clear api, when staleness threshold is lower than staleness, it should clean the stale keys from cache public void testCacheClearAPIRemovesStaleKeysWhenStalenessThresholdIsLow() throws Exception { - String node = internalCluster().startNode( Settings.builder() .put(IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_STALENESS_THRESHOLD_SETTING_KEY, 0.10) From 9e3c4c16a2424142011a1ecae44ed92c752c5c17 Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Mon, 17 Jun 2024 22:45:50 -0700 Subject: [PATCH 4/6] ensure searchable Signed-off-by: Kiran Prakash --- .../org/opensearch/indices/IndicesRequestCacheIT.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java index 7cac16936528f..8db239f23692e 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java @@ -139,6 +139,7 @@ public void testCacheAggs() throws Exception { client.prepareIndex(index).setSource("f", "2014-03-10T00:00:00.000Z"), client.prepareIndex(index).setSource("f", "2014-05-13T00:00:00.000Z") ); + ensureSearchable(index); // Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache forceMerge(client, index); @@ -222,7 +223,6 @@ public void testQueryRewrite() throws Exception { ); ensureSearchable(index); assertCacheState(client, index, 0, 0); - // Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache forceMerge(client, index); @@ -653,6 +653,7 @@ public void testProfileDisableCache() throws Exception { .get() ); indexRandom(true, client.prepareIndex(index).setSource("k", "hello")); + ensureSearchable(index); // Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache forceMerge(client, index); @@ -699,6 +700,7 @@ public void testCacheWithInvalidation() throws Exception { .get() ); indexRandom(true, client.prepareIndex(index).setSource("k", "hello")); + ensureSearchable(index); // Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache forceMerge(client, index); SearchResponse resp = client.prepareSearch(index).setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get(); @@ -1260,6 +1262,7 @@ public void testDeleteAndCreateSameIndexShardOnSameNode() throws Exception { logger.info("Writing few docs and searching those which will cache items in RequestCache"); indexRandom(true, client.prepareIndex(indexName).setSource("k", "hello")); indexRandom(true, client.prepareIndex(indexName).setSource("y", "hello again")); + ensureSearchable(indexName); // Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache forceMerge(client, indexName); SearchResponse resp = client.prepareSearch(indexName).setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get(); @@ -1359,13 +1362,14 @@ private void setupIndex(Client client, String index) throws Exception { indexRandom(true, client.prepareIndex(index).setSource("k", "hello")); indexRandom(true, client.prepareIndex(index).setSource("k", "there")); + ensureSearchable(index); + forceMerge(client, index); } private void forceMerge(Client client, String index) { ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge(index).setFlush(true).get(); OpenSearchAssertions.assertAllSuccessful(forceMergeResponse); refreshAndWaitForReplication(); - ensureSearchable(index); } private void createCacheEntry(Client client, String index, String value) { From ab90861ff6da749a811ac3dfe117577545ac133a Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Tue, 18 Jun 2024 18:04:12 -0700 Subject: [PATCH 5/6] move cleanup tests to own class Signed-off-by: Kiran Prakash --- .../indices/IndicesRequestCacheCleanupIT.java | 712 ++++++++++++++++++ .../indices/IndicesRequestCacheIT.java | 596 --------------- 2 files changed, 712 insertions(+), 596 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheCleanupIT.java diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheCleanupIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheCleanupIT.java new file mode 100644 index 0000000000000..6ccd453ee0ac5 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheCleanupIT.java @@ -0,0 +1,712 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.indices; + +import org.opensearch.action.admin.cluster.node.stats.NodeStats; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.opensearch.action.admin.indices.cache.clear.ClearIndicesCacheRequest; +import org.opensearch.action.admin.indices.forcemerge.ForceMergeResponse; +import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.client.Client; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.index.IndexNotFoundException; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.MergePolicyProvider; +import org.opensearch.index.cache.request.RequestCacheStats; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.plugins.Plugin; +import org.opensearch.test.InternalSettingsPlugin; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.hamcrest.OpenSearchAssertions; + +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.TimeUnit; + +import static org.opensearch.indices.IndicesRequestCache.INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING; +import static org.opensearch.indices.IndicesService.INDICES_CACHE_CLEANUP_INTERVAL_SETTING_KEY; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse; +import static org.hamcrest.Matchers.equalTo; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, supportsDedicatedMasters = false) +public class IndicesRequestCacheCleanupIT extends OpenSearchIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(InternalSettingsPlugin.class); + } + + public void testCacheWithInvalidation() throws Exception { + Client client = client(); + String index = "index"; + assertAcked( + client.admin() + .indices() + .prepareCreate(index) + .setMapping("k", "type=keyword") + .setSettings( + Settings.builder() + .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put("index.refresh_interval", -1) + // Disable index refreshing to avoid cache being invalidated mid-test + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(-1)) + ) + .get() + ); + indexRandom(true, client.prepareIndex(index).setSource("k", "hello")); + ensureSearchable(index); + // Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache + forceMerge(client, index); + SearchResponse resp = client.prepareSearch(index).setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get(); + assertSearchResponse(resp); + OpenSearchAssertions.assertAllSuccessful(resp); + assertThat(resp.getHits().getTotalHits().value, equalTo(1L)); + + assertCacheState(client, index, 0, 1); + // Index but don't refresh + indexRandom(false, client.prepareIndex(index).setSource("k", "hello2")); + resp = client.prepareSearch(index).setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get(); + assertSearchResponse(resp); + // Should expect hit as here as refresh didn't happen + assertCacheState(client, index, 1, 1); + + // Explicit refresh would invalidate cache + refreshAndWaitForReplication(); + // Hit same query again + resp = client.prepareSearch(index).setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get(); + assertSearchResponse(resp); + // Should expect miss as key has changed due to change in IndexReader.CacheKey (due to refresh) + assertCacheState(client, index, 1, 2); + } + + // calling cache clear api, when staleness threshold is lower than staleness, it should clean the stale keys from cache + public void testCacheClearAPIRemovesStaleKeysWhenStalenessThresholdIsLow() throws Exception { + String node = internalCluster().startNode( + Settings.builder() + .put(IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_STALENESS_THRESHOLD_SETTING_KEY, 0.10) + .put( + IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_INTERVAL_SETTING_KEY, + // setting intentionally high to avoid cache cleaner interfering + TimeValue.timeValueMillis(300) + ) + ); + Client client = client(node); + String index1 = "index1"; + String index2 = "index2"; + setupIndex(client, index1); + setupIndex(client, index2); + + // create first cache entry in index1 + createCacheEntry(client, index1, "hello"); + assertCacheState(client, index1, 0, 1); + long memorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes(); + assertTrue(memorySizeForIndex1 > 0); + + // create second cache entry in index1 + createCacheEntry(client, index1, "there"); + assertCacheState(client, index1, 0, 2); + long finalMemorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes(); + assertTrue(finalMemorySizeForIndex1 > memorySizeForIndex1); + + // create first cache entry in index2 + createCacheEntry(client, index2, "hello"); + assertCacheState(client, index2, 0, 1); + assertTrue(getRequestCacheStats(client, index2).getMemorySizeInBytes() > 0); + + ClearIndicesCacheRequest clearIndicesCacheRequest = new ClearIndicesCacheRequest(index2); + client.admin().indices().clearCache(clearIndicesCacheRequest).actionGet(); + + // cache cleaner should have cleaned up the stale key from index 2 + assertEquals(0, getRequestCacheStats(client, index2).getMemorySizeInBytes()); + // cache cleaner should NOT have cleaned from index 1 + assertEquals(finalMemorySizeForIndex1, getRequestCacheStats(client, index1).getMemorySizeInBytes()); + } + + // when staleness threshold is lower than staleness, it should clean the stale keys from cache + public void testStaleKeysCleanupWithLowThreshold() throws Exception { + int cacheCleanIntervalInMillis = 1; + String node = internalCluster().startNode( + Settings.builder() + .put(IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_STALENESS_THRESHOLD_SETTING_KEY, 0.10) + .put( + IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_INTERVAL_SETTING_KEY, + TimeValue.timeValueMillis(cacheCleanIntervalInMillis) + ) + ); + Client client = client(node); + String index1 = "index1"; + String index2 = "index2"; + setupIndex(client, index1); + setupIndex(client, index2); + + // create first cache entry in index1 + createCacheEntry(client, index1, "hello"); + assertCacheState(client, index1, 0, 1); + long memorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes(); + assertTrue(memorySizeForIndex1 > 0); + + // create second cache entry in index1 + createCacheEntry(client, index1, "there"); + assertCacheState(client, index1, 0, 2); + long finalMemorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes(); + assertTrue(finalMemorySizeForIndex1 > memorySizeForIndex1); + + // create first cache entry in index2 + createCacheEntry(client, index2, "hello"); + assertCacheState(client, index2, 0, 1); + assertTrue(getRequestCacheStats(client, index2).getMemorySizeInBytes() > 0); + + // create 1 stale key + indexRandom(false, client.prepareIndex(index2).setId("1").setSource("d", "hello")); + forceMerge(client, index2); + // sleep until cache cleaner would have cleaned up the stale key from index 2 + assertBusy(() -> { + // cache cleaner should have cleaned up the stale key from index 2 + assertEquals(0, getRequestCacheStats(client, index2).getMemorySizeInBytes()); + // cache cleaner should NOT have cleaned from index 1 + assertEquals(finalMemorySizeForIndex1, getRequestCacheStats(client, index1).getMemorySizeInBytes()); + }, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS); + // sleep until cache cleaner would have cleaned up the stale key from index 2 + } + + // when staleness threshold is equal to staleness, it should clean the stale keys from cache + public void testCacheCleanupOnEqualStalenessAndThreshold() throws Exception { + int cacheCleanIntervalInMillis = 1; + String node = internalCluster().startNode( + Settings.builder() + .put(IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_STALENESS_THRESHOLD_SETTING_KEY, 0.33) + .put( + IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_INTERVAL_SETTING_KEY, + TimeValue.timeValueMillis(cacheCleanIntervalInMillis) + ) + ); + Client client = client(node); + String index1 = "index1"; + String index2 = "index2"; + setupIndex(client, index1); + setupIndex(client, index2); + + // create first cache entry in index1 + createCacheEntry(client, index1, "hello"); + assertCacheState(client, index1, 0, 1); + long memorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes(); + assertTrue(memorySizeForIndex1 > 0); + + // create second cache entry in index1 + createCacheEntry(client, index1, "there"); + assertCacheState(client, index1, 0, 2); + long finalMemorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes(); + assertTrue(finalMemorySizeForIndex1 > memorySizeForIndex1); + + // create first cache entry in index2 + createCacheEntry(client, index2, "hello"); + assertCacheState(client, index2, 0, 1); + assertTrue(getRequestCacheStats(client, index2).getMemorySizeInBytes() > 0); + + // create 1 stale key + indexRandom(false, client.prepareIndex(index2).setId("1").setSource("d", "hello")); + forceMerge(client, index2); + // sleep until cache cleaner would have cleaned up the stale key from index 2 + assertBusy(() -> { + // cache cleaner should have cleaned up the stale key from index 2 + assertEquals(0, getRequestCacheStats(client, index2).getMemorySizeInBytes()); + // cache cleaner should NOT have cleaned from index 1 + assertEquals(finalMemorySizeForIndex1, getRequestCacheStats(client, index1).getMemorySizeInBytes()); + }, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS); + } + + // when staleness threshold is higher than staleness, it should NOT clean the cache + public void testCacheCleanupSkipsWithHighStalenessThreshold() throws Exception { + int cacheCleanIntervalInMillis = 1; + String node = internalCluster().startNode( + Settings.builder() + .put(IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_STALENESS_THRESHOLD_SETTING_KEY, 0.90) + .put( + IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_INTERVAL_SETTING_KEY, + TimeValue.timeValueMillis(cacheCleanIntervalInMillis) + ) + ); + Client client = client(node); + String index1 = "index1"; + String index2 = "index2"; + setupIndex(client, index1); + setupIndex(client, index2); + + // create first cache entry in index1 + createCacheEntry(client, index1, "hello"); + assertCacheState(client, index1, 0, 1); + long memorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes(); + assertTrue(memorySizeForIndex1 > 0); + + // create second cache entry in index1 + createCacheEntry(client, index1, "there"); + assertCacheState(client, index1, 0, 2); + long finalMemorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes(); + assertTrue(finalMemorySizeForIndex1 > memorySizeForIndex1); + + // create first cache entry in index2 + createCacheEntry(client, index2, "hello"); + assertCacheState(client, index2, 0, 1); + assertTrue(getRequestCacheStats(client, index2).getMemorySizeInBytes() > 0); + + // force refresh so that it creates 1 stale key + flushAndRefresh(index2); + // sleep until cache cleaner would have cleaned up the stale key from index 2 + assertBusy(() -> { + // cache cleaner should NOT have cleaned up the stale key from index 2 + assertTrue(getRequestCacheStats(client, index2).getMemorySizeInBytes() > 0); + // cache cleaner should NOT have cleaned from index 1 + assertEquals(finalMemorySizeForIndex1, getRequestCacheStats(client, index1).getMemorySizeInBytes()); + }, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS); + } + + // when staleness threshold is explicitly set to 0, cache cleaner regularly cleans up stale keys. + public void testCacheCleanupOnZeroStalenessThreshold() throws Exception { + int cacheCleanIntervalInMillis = 50; + String node = internalCluster().startNode( + Settings.builder() + .put(IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_STALENESS_THRESHOLD_SETTING_KEY, 0) + .put( + IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_INTERVAL_SETTING_KEY, + TimeValue.timeValueMillis(cacheCleanIntervalInMillis) + ) + ); + Client client = client(node); + String index1 = "index1"; + String index2 = "index2"; + setupIndex(client, index1); + setupIndex(client, index2); + + // create 10 index1 cache entries + for (int i = 1; i <= 10; i++) { + long cacheSizeBefore = getRequestCacheStats(client, index1).getMemorySizeInBytes(); + createCacheEntry(client, index1, "hello" + i); + assertCacheState(client, index1, 0, i); + long cacheSizeAfter = getRequestCacheStats(client, index1).getMemorySizeInBytes(); + assertTrue(cacheSizeAfter > cacheSizeBefore); + } + + long finalMemorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes(); + + // create first cache entry in index2 + createCacheEntry(client, index2, "hello"); + assertCacheState(client, index2, 0, 1); + assertTrue(getRequestCacheStats(client, index2).getMemorySizeInBytes() > 0); + + // create 1 stale key + indexRandom(false, client.prepareIndex(index2).setId("1").setSource("d", "hello")); + forceMerge(client, index2); + // sleep until cache cleaner would have cleaned up the stale key from index 2 + assertBusy(() -> { + // cache cleaner should have cleaned up the stale key from index 2 + assertEquals(0, getRequestCacheStats(client, index2).getMemorySizeInBytes()); + // cache cleaner should NOT have cleaned from index 1 + assertEquals(finalMemorySizeForIndex1, getRequestCacheStats(client, index1).getMemorySizeInBytes()); + }, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS); + } + + // when staleness threshold is not explicitly set, cache cleaner regularly cleans up stale keys + public void testStaleKeysRemovalWithoutExplicitThreshold() throws Exception { + int cacheCleanIntervalInMillis = 1; + String node = internalCluster().startNode( + Settings.builder() + .put( + IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_INTERVAL_SETTING_KEY, + TimeValue.timeValueMillis(cacheCleanIntervalInMillis) + ) + ); + String index1 = "index1"; + String index2 = "index2"; + Client client = client(node); + setupIndex(client, index1); + setupIndex(client, index2); + + // create first cache entry in index1 + createCacheEntry(client, index1, "hello"); + assertCacheState(client, index1, 0, 1); + long memorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes(); + assertTrue(memorySizeForIndex1 > 0); + + // create second cache entry in index1 + createCacheEntry(client, index1, "there"); + assertCacheState(client, index1, 0, 2); + long finalMemorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes(); + assertTrue(finalMemorySizeForIndex1 > memorySizeForIndex1); + + // create first cache entry in index2 + createCacheEntry(client, index2, "hello"); + assertCacheState(client, index2, 0, 1); + assertTrue(getRequestCacheStats(client, index2).getMemorySizeInBytes() > 0); + + // force refresh so that it creates 1 stale key + indexRandom(false, client.prepareIndex(index2).setId("1").setSource("d", "hello")); + forceMerge(client, index2); + // sleep until cache cleaner would have cleaned up the stale key from index 2 + assertBusy(() -> { + assertEquals(2, getSegmentCount(client, index2)); + // cache cleaner should have cleaned up the stale key from index 2 + assertEquals(0, getRequestCacheStats(client, index2).getMemorySizeInBytes()); + // cache cleaner should NOT have cleaned from index 1 + assertEquals(finalMemorySizeForIndex1, getRequestCacheStats(client, index1).getMemorySizeInBytes()); + }, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS); + } + + private int getSegmentCount(Client client, String indexName) { + return client.admin() + .indices() + .segments(new IndicesSegmentsRequest(indexName)) + .actionGet() + .getIndices() + .get(indexName) + .getShards() + .get(0) + .getShards()[0].getSegments() + .size(); + } + + // when cache cleaner interval setting is not set, cache cleaner is configured appropriately with the fall-back setting + public void testCacheCleanupWithDefaultSettings() throws Exception { + int cacheCleanIntervalInMillis = 1; + String node = internalCluster().startNode( + Settings.builder().put(INDICES_CACHE_CLEANUP_INTERVAL_SETTING_KEY, TimeValue.timeValueMillis(cacheCleanIntervalInMillis)) + ); + Client client = client(node); + String index1 = "index1"; + String index2 = "index2"; + setupIndex(client, index1); + setupIndex(client, index2); + + // create first cache entry in index1 + createCacheEntry(client, index1, "hello"); + assertCacheState(client, index1, 0, 1); + long memorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes(); + assertTrue(memorySizeForIndex1 > 0); + + // create second cache entry in index1 + createCacheEntry(client, index1, "there"); + assertCacheState(client, index1, 0, 2); + long finalMemorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes(); + assertTrue(finalMemorySizeForIndex1 > memorySizeForIndex1); + + // create first cache entry in index2 + createCacheEntry(client, index2, "hello"); + assertCacheState(client, index2, 0, 1); + assertTrue(getRequestCacheStats(client, index2).getMemorySizeInBytes() > 0); + + // create 1 stale key + indexRandom(false, client.prepareIndex(index2).setId("1").setSource("d", "hello")); + forceMerge(client, index2); + // sleep until cache cleaner would have cleaned up the stale key from index 2 + assertBusy(() -> { + // cache cleaner should have cleaned up the stale key from index 2 + assertEquals(0, getRequestCacheStats(client, index2).getMemorySizeInBytes()); + // cache cleaner should NOT have cleaned from index 1 + assertEquals(finalMemorySizeForIndex1, getRequestCacheStats(client, index1).getMemorySizeInBytes()); + }, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS); + } + + // staleness threshold updates flows through to the cache cleaner + public void testDynamicStalenessThresholdUpdate() throws Exception { + int cacheCleanIntervalInMillis = 1; + String node = internalCluster().startNode( + Settings.builder() + .put(IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_STALENESS_THRESHOLD_SETTING_KEY, 0.90) + .put( + IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_INTERVAL_SETTING_KEY, + TimeValue.timeValueMillis(cacheCleanIntervalInMillis) + ) + ); + Client client = client(node); + String index1 = "index1"; + String index2 = "index2"; + setupIndex(client, index1); + setupIndex(client, index2); + + // create first cache entry in index1 + createCacheEntry(client, index1, "hello"); + assertCacheState(client, index1, 0, 1); + long memorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes(); + assertTrue(memorySizeForIndex1 > 0); + + // create second cache entry in index1 + createCacheEntry(client, index1, "there"); + assertCacheState(client, index1, 0, 2); + assertTrue(getRequestCacheStats(client, index1).getMemorySizeInBytes() > memorySizeForIndex1); + + // create first cache entry in index2 + createCacheEntry(client, index2, "hello"); + assertCacheState(client, index2, 0, 1); + long finalMemorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes(); + assertTrue(finalMemorySizeForIndex1 > 0); + + // create 1 stale key + indexRandom(false, client.prepareIndex(index2).setId("1").setSource("d", "hello")); + forceMerge(client, index2); + assertBusy(() -> { + // cache cleaner should NOT have cleaned up the stale key from index 2 + assertTrue(getRequestCacheStats(client, index2).getMemorySizeInBytes() > 0); + }, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS); + + // Update indices.requests.cache.cleanup.staleness_threshold to "10%" + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.persistentSettings(Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), 0.10)); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + assertBusy(() -> { + // cache cleaner should have cleaned up the stale key from index 2 + assertEquals(0, getRequestCacheStats(client, index2).getMemorySizeInBytes()); + // cache cleaner should NOT have cleaned from index 1 + assertEquals(finalMemorySizeForIndex1, getRequestCacheStats(client, index1).getMemorySizeInBytes()); + }, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS); + } + + // staleness threshold dynamic updates should throw exceptions on invalid input + public void testInvalidStalenessThresholdUpdateThrowsException() throws Exception { + // Update indices.requests.cache.cleanup.staleness_threshold to "10%" with illegal argument + assertThrows("Ratio should be in [0-1.0]", IllegalArgumentException.class, () -> { + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.persistentSettings( + Settings.builder().put(IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_STALENESS_THRESHOLD_SETTING_KEY, 10) + ); + client().admin().cluster().updateSettings(updateSettingsRequest).actionGet(); + }); + } + + // closing the Index after caching will clean up from Indices Request Cache + public void testCacheClearanceAfterIndexClosure() throws Exception { + int cacheCleanIntervalInMillis = 100; + String node = internalCluster().startNode( + Settings.builder() + .put(IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_STALENESS_THRESHOLD_SETTING_KEY, 0.10) + .put( + IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_INTERVAL_SETTING_KEY, + TimeValue.timeValueMillis(cacheCleanIntervalInMillis) + ) + ); + Client client = client(node); + String index = "index"; + setupIndex(client, index); + + // assert there are no entries in the cache for index + assertEquals(0, getRequestCacheStats(client, index).getMemorySizeInBytes()); + // assert there are no entries in the cache from other indices in the node + assertEquals(0, getNodeCacheStats(client).getMemorySizeInBytes()); + // create first cache entry in index + createCacheEntry(client, index, "hello"); + assertCacheState(client, index, 0, 1); + assertTrue(getRequestCacheStats(client, index).getMemorySizeInBytes() > 0); + assertTrue(getNodeCacheStats(client).getMemorySizeInBytes() > 0); + + // close index + assertAcked(client.admin().indices().prepareClose(index)); + // request cache stats cannot be access since Index should be closed + try { + getRequestCacheStats(client, index); + } catch (Exception e) { + assert (e instanceof IndexClosedException); + } + // sleep until cache cleaner would have cleaned up the stale key from index + assertBusy(() -> { + // cache cleaner should have cleaned up the stale keys from index + assertEquals(0, getNodeCacheStats(client).getMemorySizeInBytes()); + }, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS); + } + + // deleting the Index after caching will clean up from Indices Request Cache + public void testCacheCleanupAfterIndexDeletion() throws Exception { + int cacheCleanIntervalInMillis = 100; + String node = internalCluster().startNode( + Settings.builder() + .put(IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_STALENESS_THRESHOLD_SETTING_KEY, 0.10) + .put( + IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_INTERVAL_SETTING_KEY, + TimeValue.timeValueMillis(cacheCleanIntervalInMillis) + ) + ); + Client client = client(node); + String index = "index"; + setupIndex(client, index); + + // assert there are no entries in the cache for index + assertEquals(0, getRequestCacheStats(client, index).getMemorySizeInBytes()); + // assert there are no entries in the cache from other indices in the node + assertEquals(0, getNodeCacheStats(client).getMemorySizeInBytes()); + // create first cache entry in index + createCacheEntry(client, index, "hello"); + assertCacheState(client, index, 0, 1); + assertTrue(getRequestCacheStats(client, index).getMemorySizeInBytes() > 0); + assertTrue(getNodeCacheStats(client).getMemorySizeInBytes() > 0); + + // delete index + assertAcked(client.admin().indices().prepareDelete(index)); + // request cache stats cannot be access since Index should be deleted + try { + getRequestCacheStats(client, index); + } catch (Exception e) { + assert (e instanceof IndexNotFoundException); + } + + // sleep until cache cleaner would have cleaned up the stale key from index + assertBusy(() -> { + // cache cleaner should have cleaned up the stale keys from index + assertEquals(0, getNodeCacheStats(client).getMemorySizeInBytes()); + }, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS); + } + + // when staleness threshold is lower than staleness, it should clean the cache from all indices having stale keys + public void testStaleKeysCleanupWithMultipleIndices() throws Exception { + int cacheCleanIntervalInMillis = 10; + String node = internalCluster().startNode( + Settings.builder() + .put(IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_STALENESS_THRESHOLD_SETTING_KEY, 0.10) + .put( + IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_INTERVAL_SETTING_KEY, + TimeValue.timeValueMillis(cacheCleanIntervalInMillis) + ) + ); + Client client = client(node); + String index1 = "index1"; + String index2 = "index2"; + setupIndex(client, index1); + setupIndex(client, index2); + + // assert cache is empty for index1 + assertEquals(0, getRequestCacheStats(client, index1).getMemorySizeInBytes()); + // create first cache entry in index1 + createCacheEntry(client, index1, "hello"); + assertCacheState(client, index1, 0, 1); + long memorySizeForIndex1With1Entries = getRequestCacheStats(client, index1).getMemorySizeInBytes(); + assertTrue(memorySizeForIndex1With1Entries > 0); + + // create second cache entry in index1 + createCacheEntry(client, index1, "there"); + assertCacheState(client, index1, 0, 2); + long memorySizeForIndex1With2Entries = getRequestCacheStats(client, index1).getMemorySizeInBytes(); + assertTrue(memorySizeForIndex1With2Entries > memorySizeForIndex1With1Entries); + + // assert cache is empty for index2 + assertEquals(0, getRequestCacheStats(client, index2).getMemorySizeInBytes()); + // create first cache entry in index2 + createCacheEntry(client, index2, "hello"); + assertCacheState(client, index2, 0, 1); + assertTrue(getRequestCacheStats(client, index2).getMemorySizeInBytes() > 0); + + // invalidate the cache for index1 + indexRandom(false, client.prepareIndex(index1).setId("1").setSource("d", "hello")); + forceMerge(client, index1); + // invalidate the cache for index2 + indexRandom(false, client.prepareIndex(index2).setId("1").setSource("d", "hello")); + forceMerge(client, index2); + // create another cache entry in index 1 same as memorySizeForIndex1With1Entries, this should not be cleaned up. + createCacheEntry(client, index1, "hello"); + // sleep until cache cleaner would have cleaned up the stale key from index2 + assertBusy(() -> { + // cache cleaner should have cleaned up the stale key from index2 and hence cache should be empty + assertEquals(0, getRequestCacheStats(client, index2).getMemorySizeInBytes()); + // cache cleaner should have only cleaned up the stale entities for index1 + long currentMemorySizeInBytesForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes(); + // assert the memory size of index1 to only contain 1 entry added after flushAndRefresh + assertEquals(memorySizeForIndex1With1Entries, currentMemorySizeInBytesForIndex1); + // cache for index1 should not be empty since there was an item cached after flushAndRefresh + assertTrue(currentMemorySizeInBytesForIndex1 > 0); + }, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS); + } + + private void setupIndex(Client client, String index) throws Exception { + assertAcked( + client.admin() + .indices() + .prepareCreate(index) + .setMapping("k", "type=keyword") + .setSettings( + Settings.builder() + .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + // Disable index refreshing to avoid cache being invalidated mid-test + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(-1)) + // Disable background segment merges invalidating the cache + .put(MergePolicyProvider.INDEX_MERGE_ENABLED, false) + ) + .get() + ); + indexRandom(false, client.prepareIndex(index).setSource("k", "hello")); + indexRandom(false, client.prepareIndex(index).setSource("k", "there")); + ensureSearchable(index); + forceMerge(client, index); + } + + private void forceMerge(Client client, String index) { + ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge(index).setFlush(true).get(); + OpenSearchAssertions.assertAllSuccessful(forceMergeResponse); + refreshAndWaitForReplication(); + } + + private void createCacheEntry(Client client, String index, String value) { + SearchResponse resp = client.prepareSearch(index).setRequestCache(true).setQuery(QueryBuilders.termQuery("k", value)).get(); + assertSearchResponse(resp); + OpenSearchAssertions.assertAllSuccessful(resp); + } + + private static void assertCacheState(Client client, String index, long expectedHits, long expectedMisses) { + RequestCacheStats requestCacheStats = getRequestCacheStats(client, index); + // Check the hit count and miss count together so if they are not + // correct we can see both values + assertEquals( + Arrays.asList(expectedHits, expectedMisses, 0L), + Arrays.asList(requestCacheStats.getHitCount(), requestCacheStats.getMissCount(), requestCacheStats.getEvictions()) + ); + + } + + private static RequestCacheStats getRequestCacheStats(Client client, String index) { + return client.admin().indices().prepareStats(index).setRequestCache(true).get().getTotal().getRequestCache(); + } + + private static RequestCacheStats getNodeCacheStats(Client client) { + NodesStatsResponse stats = client.admin().cluster().prepareNodesStats().execute().actionGet(); + for (NodeStats stat : stats.getNodes()) { + if (stat.getNode().isDataNode()) { + return stat.getIndices().getRequestCache(); + } + } + return null; + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java index 8db239f23692e..09d5c208a8756 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java @@ -37,7 +37,6 @@ import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.action.admin.cluster.node.stats.NodeStats; import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; -import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.opensearch.action.admin.indices.alias.Alias; import org.opensearch.action.admin.indices.cache.clear.ClearIndicesCacheRequest; import org.opensearch.action.admin.indices.forcemerge.ForceMergeResponse; @@ -55,10 +54,7 @@ import org.opensearch.core.index.Index; import org.opensearch.core.index.shard.ShardId; import org.opensearch.env.NodeEnvironment; -import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.IndexSettings; -import org.opensearch.index.MergeSchedulerConfig; -import org.opensearch.index.TieredMergePolicyProvider; import org.opensearch.index.cache.request.RequestCacheStats; import org.opensearch.index.query.QueryBuilders; import org.opensearch.search.aggregations.bucket.global.GlobalAggregationBuilder; @@ -78,13 +74,10 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; -import java.util.concurrent.TimeUnit; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; import static org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING; -import static org.opensearch.indices.IndicesRequestCache.INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING; -import static org.opensearch.indices.IndicesService.INDICES_CACHE_CLEANUP_INTERVAL_SETTING_KEY; import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING; import static org.opensearch.search.aggregations.AggregationBuilders.dateHistogram; import static org.opensearch.search.aggregations.AggregationBuilders.dateRange; @@ -680,560 +673,6 @@ public void testProfileDisableCache() throws Exception { } } - public void testCacheWithInvalidation() throws Exception { - Client client = client(); - String index = "index"; - assertAcked( - client.admin() - .indices() - .prepareCreate(index) - .setMapping("k", "type=keyword") - .setSettings( - Settings.builder() - .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - .put("index.refresh_interval", -1) - // Disable index refreshing to avoid cache being invalidated mid-test - .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(-1)) - ) - .get() - ); - indexRandom(true, client.prepareIndex(index).setSource("k", "hello")); - ensureSearchable(index); - // Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache - forceMerge(client, index); - SearchResponse resp = client.prepareSearch(index).setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get(); - assertSearchResponse(resp); - OpenSearchAssertions.assertAllSuccessful(resp); - assertThat(resp.getHits().getTotalHits().value, equalTo(1L)); - - assertCacheState(client, index, 0, 1); - // Index but don't refresh - indexRandom(false, client.prepareIndex(index).setSource("k", "hello2")); - resp = client.prepareSearch(index).setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get(); - assertSearchResponse(resp); - // Should expect hit as here as refresh didn't happen - assertCacheState(client, index, 1, 1); - - // Explicit refresh would invalidate cache - refreshAndWaitForReplication(); - // Hit same query again - resp = client.prepareSearch(index).setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get(); - assertSearchResponse(resp); - // Should expect miss as key has changed due to change in IndexReader.CacheKey (due to refresh) - assertCacheState(client, index, 1, 2); - } - - // calling cache clear api, when staleness threshold is lower than staleness, it should clean the stale keys from cache - public void testCacheClearAPIRemovesStaleKeysWhenStalenessThresholdIsLow() throws Exception { - String node = internalCluster().startNode( - Settings.builder() - .put(IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_STALENESS_THRESHOLD_SETTING_KEY, 0.10) - .put( - IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_INTERVAL_SETTING_KEY, - // setting intentionally high to avoid cache cleaner interfering - TimeValue.timeValueMillis(300) - ) - ); - Client client = client(node); - String index1 = "index1"; - String index2 = "index2"; - setupIndex(client, index1); - setupIndex(client, index2); - - // create first cache entry in index1 - createCacheEntry(client, index1, "hello"); - assertCacheState(client, index1, 0, 1); - long memorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes(); - assertTrue(memorySizeForIndex1 > 0); - - // create second cache entry in index1 - createCacheEntry(client, index1, "there"); - assertCacheState(client, index1, 0, 2); - long finalMemorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes(); - assertTrue(finalMemorySizeForIndex1 > memorySizeForIndex1); - - // create first cache entry in index2 - createCacheEntry(client, index2, "hello"); - assertCacheState(client, index2, 0, 1); - assertTrue(getRequestCacheStats(client, index2).getMemorySizeInBytes() > 0); - - ClearIndicesCacheRequest clearIndicesCacheRequest = new ClearIndicesCacheRequest(index2); - client.admin().indices().clearCache(clearIndicesCacheRequest).actionGet(); - - // cache cleaner should have cleaned up the stale key from index 2 - assertEquals(0, getRequestCacheStats(client, index2).getMemorySizeInBytes()); - // cache cleaner should NOT have cleaned from index 1 - assertEquals(finalMemorySizeForIndex1, getRequestCacheStats(client, index1).getMemorySizeInBytes()); - } - - // when staleness threshold is lower than staleness, it should clean the stale keys from cache - public void testStaleKeysCleanupWithLowThreshold() throws Exception { - int cacheCleanIntervalInMillis = 1; - String node = internalCluster().startNode( - Settings.builder() - .put(IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_STALENESS_THRESHOLD_SETTING_KEY, 0.10) - .put( - IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_INTERVAL_SETTING_KEY, - TimeValue.timeValueMillis(cacheCleanIntervalInMillis) - ) - ); - Client client = client(node); - String index1 = "index1"; - String index2 = "index2"; - setupIndex(client, index1); - setupIndex(client, index2); - - // create first cache entry in index1 - createCacheEntry(client, index1, "hello"); - assertCacheState(client, index1, 0, 1); - long memorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes(); - assertTrue(memorySizeForIndex1 > 0); - - // create second cache entry in index1 - createCacheEntry(client, index1, "there"); - assertCacheState(client, index1, 0, 2); - long finalMemorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes(); - assertTrue(finalMemorySizeForIndex1 > memorySizeForIndex1); - - // create first cache entry in index2 - createCacheEntry(client, index2, "hello"); - assertCacheState(client, index2, 0, 1); - assertTrue(getRequestCacheStats(client, index2).getMemorySizeInBytes() > 0); - - // force refresh so that it creates 1 stale key - flushAndRefresh(index2); - // sleep until cache cleaner would have cleaned up the stale key from index 2 - assertBusy(() -> { - // cache cleaner should have cleaned up the stale key from index 2 - assertEquals(0, getRequestCacheStats(client, index2).getMemorySizeInBytes()); - // cache cleaner should NOT have cleaned from index 1 - assertEquals(finalMemorySizeForIndex1, getRequestCacheStats(client, index1).getMemorySizeInBytes()); - }, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS); - // sleep until cache cleaner would have cleaned up the stale key from index 2 - } - - // when staleness threshold is equal to staleness, it should clean the stale keys from cache - public void testCacheCleanupOnEqualStalenessAndThreshold() throws Exception { - int cacheCleanIntervalInMillis = 1; - String node = internalCluster().startNode( - Settings.builder() - .put(IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_STALENESS_THRESHOLD_SETTING_KEY, 0.33) - .put( - IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_INTERVAL_SETTING_KEY, - TimeValue.timeValueMillis(cacheCleanIntervalInMillis) - ) - ); - Client client = client(node); - String index1 = "index1"; - String index2 = "index2"; - setupIndex(client, index1); - setupIndex(client, index2); - - // create first cache entry in index1 - createCacheEntry(client, index1, "hello"); - assertCacheState(client, index1, 0, 1); - long memorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes(); - assertTrue(memorySizeForIndex1 > 0); - - // create second cache entry in index1 - createCacheEntry(client, index1, "there"); - assertCacheState(client, index1, 0, 2); - long finalMemorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes(); - assertTrue(finalMemorySizeForIndex1 > memorySizeForIndex1); - - // create first cache entry in index2 - createCacheEntry(client, index2, "hello"); - assertCacheState(client, index2, 0, 1); - assertTrue(getRequestCacheStats(client, index2).getMemorySizeInBytes() > 0); - - // force refresh so that it creates 1 stale key - flushAndRefresh(index2); - // sleep until cache cleaner would have cleaned up the stale key from index 2 - assertBusy(() -> { - // cache cleaner should have cleaned up the stale key from index 2 - assertEquals(0, getRequestCacheStats(client, index2).getMemorySizeInBytes()); - // cache cleaner should NOT have cleaned from index 1 - assertEquals(finalMemorySizeForIndex1, getRequestCacheStats(client, index1).getMemorySizeInBytes()); - }, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS); - } - - // when staleness threshold is higher than staleness, it should NOT clean the cache - public void testCacheCleanupSkipsWithHighStalenessThreshold() throws Exception { - int cacheCleanIntervalInMillis = 1; - String node = internalCluster().startNode( - Settings.builder() - .put(IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_STALENESS_THRESHOLD_SETTING_KEY, 0.90) - .put( - IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_INTERVAL_SETTING_KEY, - TimeValue.timeValueMillis(cacheCleanIntervalInMillis) - ) - ); - Client client = client(node); - String index1 = "index1"; - String index2 = "index2"; - setupIndex(client, index1); - setupIndex(client, index2); - - // create first cache entry in index1 - createCacheEntry(client, index1, "hello"); - assertCacheState(client, index1, 0, 1); - long memorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes(); - assertTrue(memorySizeForIndex1 > 0); - - // create second cache entry in index1 - createCacheEntry(client, index1, "there"); - assertCacheState(client, index1, 0, 2); - long finalMemorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes(); - assertTrue(finalMemorySizeForIndex1 > memorySizeForIndex1); - - // create first cache entry in index2 - createCacheEntry(client, index2, "hello"); - assertCacheState(client, index2, 0, 1); - assertTrue(getRequestCacheStats(client, index2).getMemorySizeInBytes() > 0); - - // force refresh so that it creates 1 stale key - flushAndRefresh(index2); - // sleep until cache cleaner would have cleaned up the stale key from index 2 - assertBusy(() -> { - // cache cleaner should NOT have cleaned up the stale key from index 2 - assertTrue(getRequestCacheStats(client, index2).getMemorySizeInBytes() > 0); - // cache cleaner should NOT have cleaned from index 1 - assertEquals(finalMemorySizeForIndex1, getRequestCacheStats(client, index1).getMemorySizeInBytes()); - }, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS); - } - - // when staleness threshold is explicitly set to 0, cache cleaner regularly cleans up stale keys. - public void testCacheCleanupOnZeroStalenessThreshold() throws Exception { - int cacheCleanIntervalInMillis = 50; - String node = internalCluster().startNode( - Settings.builder() - .put(IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_STALENESS_THRESHOLD_SETTING_KEY, 0) - .put( - IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_INTERVAL_SETTING_KEY, - TimeValue.timeValueMillis(cacheCleanIntervalInMillis) - ) - ); - Client client = client(node); - String index1 = "index1"; - String index2 = "index2"; - setupIndex(client, index1); - setupIndex(client, index2); - - // create 10 index1 cache entries - for (int i = 1; i <= 10; i++) { - long cacheSizeBefore = getRequestCacheStats(client, index1).getMemorySizeInBytes(); - createCacheEntry(client, index1, "hello" + i); - assertCacheState(client, index1, 0, i); - long cacheSizeAfter = getRequestCacheStats(client, index1).getMemorySizeInBytes(); - assertTrue(cacheSizeAfter > cacheSizeBefore); - } - - long finalMemorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes(); - - // create first cache entry in index2 - createCacheEntry(client, index2, "hello"); - assertCacheState(client, index2, 0, 1); - assertTrue(getRequestCacheStats(client, index2).getMemorySizeInBytes() > 0); - - // force refresh so that it creates 1 stale key - flushAndRefresh(index2); - // sleep until cache cleaner would have cleaned up the stale key from index 2 - assertBusy(() -> { - // cache cleaner should have cleaned up the stale key from index 2 - assertEquals(0, getRequestCacheStats(client, index2).getMemorySizeInBytes()); - // cache cleaner should NOT have cleaned from index 1 - assertEquals(finalMemorySizeForIndex1, getRequestCacheStats(client, index1).getMemorySizeInBytes()); - }, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS); - } - - // when staleness threshold is not explicitly set, cache cleaner regularly cleans up stale keys - public void testStaleKeysRemovalWithoutExplicitThreshold() throws Exception { - int cacheCleanIntervalInMillis = 1; - String node = internalCluster().startNode( - Settings.builder() - .put( - IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_INTERVAL_SETTING_KEY, - TimeValue.timeValueMillis(cacheCleanIntervalInMillis) - ) - ); - String index1 = "index1"; - String index2 = "index2"; - Client client = client(node); - setupIndex(client, index1); - setupIndex(client, index2); - - // create first cache entry in index1 - createCacheEntry(client, index1, "hello"); - assertCacheState(client, index1, 0, 1); - long memorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes(); - assertTrue(memorySizeForIndex1 > 0); - - // create second cache entry in index1 - createCacheEntry(client, index1, "there"); - assertCacheState(client, index1, 0, 2); - long finalMemorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes(); - assertTrue(finalMemorySizeForIndex1 > memorySizeForIndex1); - - // create first cache entry in index2 - createCacheEntry(client, index2, "hello"); - assertCacheState(client, index2, 0, 1); - assertTrue(getRequestCacheStats(client, index2).getMemorySizeInBytes() > 0); - - // force refresh so that it creates 1 stale key - flushAndRefresh(index2); - // sleep until cache cleaner would have cleaned up the stale key from index 2 - assertBusy(() -> { - // cache cleaner should have cleaned up the stale key from index 2 - assertEquals(0, getRequestCacheStats(client, index2).getMemorySizeInBytes()); - // cache cleaner should NOT have cleaned from index 1 - assertEquals(finalMemorySizeForIndex1, getRequestCacheStats(client, index1).getMemorySizeInBytes()); - }, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS); - } - - // when cache cleaner interval setting is not set, cache cleaner is configured appropriately with the fall-back setting - public void testCacheCleanupWithDefaultSettings() throws Exception { - int cacheCleanIntervalInMillis = 1; - String node = internalCluster().startNode( - Settings.builder().put(INDICES_CACHE_CLEANUP_INTERVAL_SETTING_KEY, TimeValue.timeValueMillis(cacheCleanIntervalInMillis)) - ); - Client client = client(node); - String index1 = "index1"; - String index2 = "index2"; - setupIndex(client, index1); - setupIndex(client, index2); - - // create first cache entry in index1 - createCacheEntry(client, index1, "hello"); - assertCacheState(client, index1, 0, 1); - long memorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes(); - assertTrue(memorySizeForIndex1 > 0); - - // create second cache entry in index1 - createCacheEntry(client, index1, "there"); - assertCacheState(client, index1, 0, 2); - long finalMemorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes(); - assertTrue(finalMemorySizeForIndex1 > memorySizeForIndex1); - - // create first cache entry in index2 - createCacheEntry(client, index2, "hello"); - assertCacheState(client, index2, 0, 1); - assertTrue(getRequestCacheStats(client, index2).getMemorySizeInBytes() > 0); - - // force refresh so that it creates 1 stale key - flushAndRefresh(index2); - // sleep until cache cleaner would have cleaned up the stale key from index 2 - assertBusy(() -> { - // cache cleaner should have cleaned up the stale key from index 2 - assertEquals(0, getRequestCacheStats(client, index2).getMemorySizeInBytes()); - // cache cleaner should NOT have cleaned from index 1 - assertEquals(finalMemorySizeForIndex1, getRequestCacheStats(client, index1).getMemorySizeInBytes()); - }, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS); - } - - // staleness threshold updates flows through to the cache cleaner - public void testDynamicStalenessThresholdUpdate() throws Exception { - int cacheCleanIntervalInMillis = 1; - String node = internalCluster().startNode( - Settings.builder() - .put(IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_STALENESS_THRESHOLD_SETTING_KEY, 0.90) - .put( - IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_INTERVAL_SETTING_KEY, - TimeValue.timeValueMillis(cacheCleanIntervalInMillis) - ) - ); - Client client = client(node); - String index1 = "index1"; - String index2 = "index2"; - setupIndex(client, index1); - setupIndex(client, index2); - - // create first cache entry in index1 - createCacheEntry(client, index1, "hello"); - assertCacheState(client, index1, 0, 1); - long memorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes(); - assertTrue(memorySizeForIndex1 > 0); - - // create second cache entry in index1 - createCacheEntry(client, index1, "there"); - assertCacheState(client, index1, 0, 2); - assertTrue(getRequestCacheStats(client, index1).getMemorySizeInBytes() > memorySizeForIndex1); - - // create first cache entry in index2 - createCacheEntry(client, index2, "hello"); - assertCacheState(client, index2, 0, 1); - long finalMemorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes(); - assertTrue(finalMemorySizeForIndex1 > 0); - - // force refresh so that it creates 1 stale key - flushAndRefresh(index2); - assertBusy(() -> { - // cache cleaner should NOT have cleaned up the stale key from index 2 - assertTrue(getRequestCacheStats(client, index2).getMemorySizeInBytes() > 0); - }, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS); - - // Update indices.requests.cache.cleanup.staleness_threshold to "10%" - ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); - updateSettingsRequest.persistentSettings(Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), 0.10)); - assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); - - assertBusy(() -> { - // cache cleaner should have cleaned up the stale key from index 2 - assertEquals(0, getRequestCacheStats(client, index2).getMemorySizeInBytes()); - // cache cleaner should NOT have cleaned from index 1 - assertEquals(finalMemorySizeForIndex1, getRequestCacheStats(client, index1).getMemorySizeInBytes()); - }, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS); - } - - // staleness threshold dynamic updates should throw exceptions on invalid input - public void testInvalidStalenessThresholdUpdateThrowsException() throws Exception { - // Update indices.requests.cache.cleanup.staleness_threshold to "10%" with illegal argument - assertThrows("Ratio should be in [0-1.0]", IllegalArgumentException.class, () -> { - ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); - updateSettingsRequest.persistentSettings( - Settings.builder().put(IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_STALENESS_THRESHOLD_SETTING_KEY, 10) - ); - client().admin().cluster().updateSettings(updateSettingsRequest).actionGet(); - }); - } - - // closing the Index after caching will clean up from Indices Request Cache - public void testCacheClearanceAfterIndexClosure() throws Exception { - int cacheCleanIntervalInMillis = 100; - String node = internalCluster().startNode( - Settings.builder() - .put(IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_STALENESS_THRESHOLD_SETTING_KEY, 0.10) - .put( - IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_INTERVAL_SETTING_KEY, - TimeValue.timeValueMillis(cacheCleanIntervalInMillis) - ) - ); - Client client = client(node); - String index = "index"; - setupIndex(client, index); - - // assert there are no entries in the cache for index - assertEquals(0, getRequestCacheStats(client, index).getMemorySizeInBytes()); - // assert there are no entries in the cache from other indices in the node - assertEquals(0, getNodeCacheStats(client).getMemorySizeInBytes()); - // create first cache entry in index - createCacheEntry(client, index, "hello"); - assertCacheState(client, index, 0, 1); - assertTrue(getRequestCacheStats(client, index).getMemorySizeInBytes() > 0); - assertTrue(getNodeCacheStats(client).getMemorySizeInBytes() > 0); - - // close index - assertAcked(client.admin().indices().prepareClose(index)); - // request cache stats cannot be access since Index should be closed - try { - getRequestCacheStats(client, index); - } catch (Exception e) { - assert (e instanceof IndexClosedException); - } - // sleep until cache cleaner would have cleaned up the stale key from index - assertBusy(() -> { - // cache cleaner should have cleaned up the stale keys from index - assertEquals(0, getNodeCacheStats(client).getMemorySizeInBytes()); - }, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS); - } - - // deleting the Index after caching will clean up from Indices Request Cache - public void testCacheCleanupAfterIndexDeletion() throws Exception { - int cacheCleanIntervalInMillis = 100; - String node = internalCluster().startNode( - Settings.builder() - .put(IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_STALENESS_THRESHOLD_SETTING_KEY, 0.10) - .put( - IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_INTERVAL_SETTING_KEY, - TimeValue.timeValueMillis(cacheCleanIntervalInMillis) - ) - ); - Client client = client(node); - String index = "index"; - setupIndex(client, index); - - // assert there are no entries in the cache for index - assertEquals(0, getRequestCacheStats(client, index).getMemorySizeInBytes()); - // assert there are no entries in the cache from other indices in the node - assertEquals(0, getNodeCacheStats(client).getMemorySizeInBytes()); - // create first cache entry in index - createCacheEntry(client, index, "hello"); - assertCacheState(client, index, 0, 1); - assertTrue(getRequestCacheStats(client, index).getMemorySizeInBytes() > 0); - assertTrue(getNodeCacheStats(client).getMemorySizeInBytes() > 0); - - // delete index - assertAcked(client.admin().indices().prepareDelete(index)); - // request cache stats cannot be access since Index should be deleted - try { - getRequestCacheStats(client, index); - } catch (Exception e) { - assert (e instanceof IndexNotFoundException); - } - - // sleep until cache cleaner would have cleaned up the stale key from index - assertBusy(() -> { - // cache cleaner should have cleaned up the stale keys from index - assertEquals(0, getNodeCacheStats(client).getMemorySizeInBytes()); - }, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS); - } - - // when staleness threshold is lower than staleness, it should clean the cache from all indices having stale keys - public void testStaleKeysCleanupWithMultipleIndices() throws Exception { - int cacheCleanIntervalInMillis = 10; - String node = internalCluster().startNode( - Settings.builder() - .put(IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_STALENESS_THRESHOLD_SETTING_KEY, 0.10) - .put( - IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_INTERVAL_SETTING_KEY, - TimeValue.timeValueMillis(cacheCleanIntervalInMillis) - ) - ); - Client client = client(node); - String index1 = "index1"; - String index2 = "index2"; - setupIndex(client, index1); - setupIndex(client, index2); - - // assert cache is empty for index1 - assertEquals(0, getRequestCacheStats(client, index1).getMemorySizeInBytes()); - // create first cache entry in index1 - createCacheEntry(client, index1, "hello"); - assertCacheState(client, index1, 0, 1); - long memorySizeForIndex1With1Entries = getRequestCacheStats(client, index1).getMemorySizeInBytes(); - assertTrue(memorySizeForIndex1With1Entries > 0); - - // create second cache entry in index1 - createCacheEntry(client, index1, "there"); - assertCacheState(client, index1, 0, 2); - long memorySizeForIndex1With2Entries = getRequestCacheStats(client, index1).getMemorySizeInBytes(); - assertTrue(memorySizeForIndex1With2Entries > memorySizeForIndex1With1Entries); - - // assert cache is empty for index2 - assertEquals(0, getRequestCacheStats(client, index2).getMemorySizeInBytes()); - // create first cache entry in index2 - createCacheEntry(client, index2, "hello"); - assertCacheState(client, index2, 0, 1); - assertTrue(getRequestCacheStats(client, index2).getMemorySizeInBytes() > 0); - - // force refresh both index1 and index2 - flushAndRefresh(index1, index2); - // create another cache entry in index 1 same as memorySizeForIndex1With1Entries, this should not be cleaned up. - createCacheEntry(client, index1, "hello"); - // sleep until cache cleaner would have cleaned up the stale key from index2 - assertBusy(() -> { - // cache cleaner should have cleaned up the stale key from index2 and hence cache should be empty - assertEquals(0, getRequestCacheStats(client, index2).getMemorySizeInBytes()); - // cache cleaner should have only cleaned up the stale entities for index1 - long currentMemorySizeInBytesForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes(); - // assert the memory size of index1 to only contain 1 entry added after flushAndRefresh - assertEquals(memorySizeForIndex1With1Entries, currentMemorySizeInBytesForIndex1); - // cache for index1 should not be empty since there was an item cached after flushAndRefresh - assertTrue(currentMemorySizeInBytesForIndex1 > 0); - }, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS); - } - public void testDeleteAndCreateSameIndexShardOnSameNode() throws Exception { String node_1 = internalCluster().startNode(Settings.builder().build()); Client client = client(node_1); @@ -1337,47 +776,12 @@ private Path[] shardDirectory(String server, Index index, int shard) { return paths; } - private void setupIndex(Client client, String index) throws Exception { - assertAcked( - client.admin() - .indices() - .prepareCreate(index) - .setMapping("k", "type=keyword") - .setSettings( - Settings.builder() - .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - // Disable index refreshing to avoid cache being invalidated mid-test - .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(-1)) - // settings to reduce the chances of background segment merges invalidating the cache - .put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), 1000d) - .put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), 2) - .put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGED_SEGMENT_SETTING.getKey(), "100gb") - .put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), 1) - .put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), 1) - ) - .get() - ); - - indexRandom(true, client.prepareIndex(index).setSource("k", "hello")); - indexRandom(true, client.prepareIndex(index).setSource("k", "there")); - ensureSearchable(index); - forceMerge(client, index); - } - private void forceMerge(Client client, String index) { ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge(index).setFlush(true).get(); OpenSearchAssertions.assertAllSuccessful(forceMergeResponse); refreshAndWaitForReplication(); } - private void createCacheEntry(Client client, String index, String value) { - SearchResponse resp = client.prepareSearch(index).setRequestCache(true).setQuery(QueryBuilders.termQuery("k", value)).get(); - assertSearchResponse(resp); - OpenSearchAssertions.assertAllSuccessful(resp); - } - private static void assertCacheState(Client client, String index, long expectedHits, long expectedMisses) { RequestCacheStats requestCacheStats = getRequestCacheStats(client, index); // Check the hit count and miss count together so if they are not From 3c4e2a3d36239cff77504bcb216ae93ea5652abe Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Wed, 19 Jun 2024 12:54:44 -0700 Subject: [PATCH 6/6] add segment counts Signed-off-by: Kiran Prakash --- .../indices/IndicesRequestCacheCleanupIT.java | 66 ++++++++++++++----- 1 file changed, 50 insertions(+), 16 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheCleanupIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheCleanupIT.java index 6ccd453ee0ac5..6f698c5816278 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheCleanupIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheCleanupIT.java @@ -90,7 +90,7 @@ public void testCacheWithInvalidation() throws Exception { ) .get() ); - indexRandom(true, client.prepareIndex(index).setSource("k", "hello")); + indexRandom(false, client.prepareIndex(index).setSource("k", "hello")); ensureSearchable(index); // Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache forceMerge(client, index); @@ -107,6 +107,8 @@ public void testCacheWithInvalidation() throws Exception { // Should expect hit as here as refresh didn't happen assertCacheState(client, index, 1, 1); + // assert segment counts stay the same + assertEquals(1, getSegmentCount(client, index)); // Explicit refresh would invalidate cache refreshAndWaitForReplication(); // Hit same query again @@ -153,6 +155,9 @@ public void testCacheClearAPIRemovesStaleKeysWhenStalenessThresholdIsLow() throw ClearIndicesCacheRequest clearIndicesCacheRequest = new ClearIndicesCacheRequest(index2); client.admin().indices().clearCache(clearIndicesCacheRequest).actionGet(); + // assert segment counts stay the same + assertEquals(1, getSegmentCount(client, index1)); + assertEquals(1, getSegmentCount(client, index2)); // cache cleaner should have cleaned up the stale key from index 2 assertEquals(0, getRequestCacheStats(client, index2).getMemorySizeInBytes()); // cache cleaner should NOT have cleaned from index 1 @@ -198,6 +203,9 @@ public void testStaleKeysCleanupWithLowThreshold() throws Exception { forceMerge(client, index2); // sleep until cache cleaner would have cleaned up the stale key from index 2 assertBusy(() -> { + // assert segment counts stay the same + assertEquals(1, getSegmentCount(client, index1)); + assertEquals(2, getSegmentCount(client, index2)); // cache cleaner should have cleaned up the stale key from index 2 assertEquals(0, getRequestCacheStats(client, index2).getMemorySizeInBytes()); // cache cleaner should NOT have cleaned from index 1 @@ -245,6 +253,9 @@ public void testCacheCleanupOnEqualStalenessAndThreshold() throws Exception { forceMerge(client, index2); // sleep until cache cleaner would have cleaned up the stale key from index 2 assertBusy(() -> { + // assert segment counts stay the same + assertEquals(1, getSegmentCount(client, index1)); + assertEquals(2, getSegmentCount(client, index2)); // cache cleaner should have cleaned up the stale key from index 2 assertEquals(0, getRequestCacheStats(client, index2).getMemorySizeInBytes()); // cache cleaner should NOT have cleaned from index 1 @@ -290,6 +301,9 @@ public void testCacheCleanupSkipsWithHighStalenessThreshold() throws Exception { flushAndRefresh(index2); // sleep until cache cleaner would have cleaned up the stale key from index 2 assertBusy(() -> { + // assert segment counts stay the same + assertEquals(1, getSegmentCount(client, index1)); + assertEquals(1, getSegmentCount(client, index2)); // cache cleaner should NOT have cleaned up the stale key from index 2 assertTrue(getRequestCacheStats(client, index2).getMemorySizeInBytes() > 0); // cache cleaner should NOT have cleaned from index 1 @@ -335,6 +349,9 @@ public void testCacheCleanupOnZeroStalenessThreshold() throws Exception { forceMerge(client, index2); // sleep until cache cleaner would have cleaned up the stale key from index 2 assertBusy(() -> { + // assert segment counts stay the same + assertEquals(1, getSegmentCount(client, index1)); + assertEquals(2, getSegmentCount(client, index2)); // cache cleaner should have cleaned up the stale key from index 2 assertEquals(0, getRequestCacheStats(client, index2).getMemorySizeInBytes()); // cache cleaner should NOT have cleaned from index 1 @@ -380,6 +397,7 @@ public void testStaleKeysRemovalWithoutExplicitThreshold() throws Exception { forceMerge(client, index2); // sleep until cache cleaner would have cleaned up the stale key from index 2 assertBusy(() -> { + assertEquals(1, getSegmentCount(client, index1)); assertEquals(2, getSegmentCount(client, index2)); // cache cleaner should have cleaned up the stale key from index 2 assertEquals(0, getRequestCacheStats(client, index2).getMemorySizeInBytes()); @@ -388,19 +406,6 @@ public void testStaleKeysRemovalWithoutExplicitThreshold() throws Exception { }, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS); } - private int getSegmentCount(Client client, String indexName) { - return client.admin() - .indices() - .segments(new IndicesSegmentsRequest(indexName)) - .actionGet() - .getIndices() - .get(indexName) - .getShards() - .get(0) - .getShards()[0].getSegments() - .size(); - } - // when cache cleaner interval setting is not set, cache cleaner is configured appropriately with the fall-back setting public void testCacheCleanupWithDefaultSettings() throws Exception { int cacheCleanIntervalInMillis = 1; @@ -435,6 +440,9 @@ public void testCacheCleanupWithDefaultSettings() throws Exception { forceMerge(client, index2); // sleep until cache cleaner would have cleaned up the stale key from index 2 assertBusy(() -> { + // assert segment counts stay the same + assertEquals(1, getSegmentCount(client, index1)); + assertEquals(2, getSegmentCount(client, index2)); // cache cleaner should have cleaned up the stale key from index 2 assertEquals(0, getRequestCacheStats(client, index2).getMemorySizeInBytes()); // cache cleaner should NOT have cleaned from index 1 @@ -490,6 +498,9 @@ public void testDynamicStalenessThresholdUpdate() throws Exception { assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); assertBusy(() -> { + // assert segment counts stay the same + assertEquals(1, getSegmentCount(client, index1)); + assertEquals(2, getSegmentCount(client, index2)); // cache cleaner should have cleaned up the stale key from index 2 assertEquals(0, getRequestCacheStats(client, index2).getMemorySizeInBytes()); // cache cleaner should NOT have cleaned from index 1 @@ -631,21 +642,31 @@ public void testStaleKeysCleanupWithMultipleIndices() throws Exception { // invalidate the cache for index1 indexRandom(false, client.prepareIndex(index1).setId("1").setSource("d", "hello")); forceMerge(client, index1); + // Assert cache is cleared up + assertBusy( + () -> { assertEquals(0, getRequestCacheStats(client, index1).getMemorySizeInBytes()); }, + cacheCleanIntervalInMillis * 2, + TimeUnit.MILLISECONDS + ); + // invalidate the cache for index2 indexRandom(false, client.prepareIndex(index2).setId("1").setSource("d", "hello")); forceMerge(client, index2); + // create another cache entry in index 1 same as memorySizeForIndex1With1Entries, this should not be cleaned up. createCacheEntry(client, index1, "hello"); + // sleep until cache cleaner would have cleaned up the stale key from index2 assertBusy(() -> { + // assert segment counts stay the same + assertEquals(2, getSegmentCount(client, index1)); + assertEquals(2, getSegmentCount(client, index2)); // cache cleaner should have cleaned up the stale key from index2 and hence cache should be empty assertEquals(0, getRequestCacheStats(client, index2).getMemorySizeInBytes()); // cache cleaner should have only cleaned up the stale entities for index1 long currentMemorySizeInBytesForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes(); // assert the memory size of index1 to only contain 1 entry added after flushAndRefresh assertEquals(memorySizeForIndex1With1Entries, currentMemorySizeInBytesForIndex1); - // cache for index1 should not be empty since there was an item cached after flushAndRefresh - assertTrue(currentMemorySizeInBytesForIndex1 > 0); }, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS); } @@ -673,6 +694,19 @@ private void setupIndex(Client client, String index) throws Exception { forceMerge(client, index); } + private int getSegmentCount(Client client, String indexName) { + return client.admin() + .indices() + .segments(new IndicesSegmentsRequest(indexName)) + .actionGet() + .getIndices() + .get(indexName) + .getShards() + .get(0) + .getShards()[0].getSegments() + .size(); + } + private void forceMerge(Client client, String index) { ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge(index).setFlush(true).get(); OpenSearchAssertions.assertAllSuccessful(forceMergeResponse);