From a2cf155e14ff0bd8d760b222d41892734e81ab4a Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Mon, 29 Apr 2024 11:56:50 -0700 Subject: [PATCH] =?UTF-8?q?[Backport=202.x][Tiered=20Caching]=20Make=20Ind?= =?UTF-8?q?ices=20Request=20Cache=20Stale=20Key=20Mgmt=20Threshold=20?= =?UTF-8?q?=E2=80=A6=20(#13413)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [Tiered Caching] Make Indices Request Cache Stale Key Mgmt Threshold setting dynamic (#12941) * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash * Update ClusterSettings.java Signed-off-by: Kiran Prakash * Update IndicesRequestCacheIT.java Signed-off-by: Kiran Prakash * spotless Signed-off-by: Kiran Prakash * Update IndicesRequestCacheIT.java Signed-off-by: Kiran Prakash * some refactoring Signed-off-by: Kiran Prakash * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash * address existing tests Signed-off-by: Kiran Prakash * UTs Signed-off-by: Kiran Prakash * Update CHANGELOG.md Signed-off-by: Kiran Prakash * ITs Signed-off-by: Kiran Prakash * spotless Signed-off-by: Kiran Prakash * refactor Signed-off-by: Kiran Prakash * Update IndicesRequestCacheIT.java Signed-off-by: Kiran Prakash * Update IndicesRequestCacheIT.java Signed-off-by: Kiran Prakash * Update IndicesRequestCacheIT.java Signed-off-by: Kiran Prakash * Update IndicesRequestCacheIT.java Signed-off-by: Kiran Prakash * Update CHANGELOG.md Signed-off-by: Kiran Prakash * Update CHANGELOG.md Signed-off-by: Kiran Prakash * Update IndicesRequestCacheIT.java Signed-off-by: Kiran Prakash * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash * Update IndicesRequestCacheIT.java Signed-off-by: Kiran Prakash * resolve conflicts Signed-off-by: Kiran Prakash * address code comments Signed-off-by: Kiran Prakash * address code comments Signed-off-by: Kiran Prakash * Update IndicesRequestCacheIT.java Signed-off-by: Kiran Prakash * rename tests Signed-off-by: Kiran Prakash * Update IndicesRequestCacheIT.java Signed-off-by: Kiran Prakash * Update IndicesRequestCacheIT.java Signed-off-by: Kiran Prakash * resolve conflicts Signed-off-by: Kiran Prakash * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash * code comments Signed-off-by: Kiran Prakash * Update IndicesRequestCacheIT.java Signed-off-by: Kiran Prakash --------- Signed-off-by: Kiran Prakash (cherry picked from commit f7984974963d745ca3cb88460251792acd7b326d) * resolve conflicts Signed-off-by: Kiran Prakash * remove typo Signed-off-by: Kiran Prakash * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash --------- Signed-off-by: Kiran Prakash --- CHANGELOG.md | 1 + .../indices/IndicesRequestCacheIT.java | 844 +++++++++++++++--- .../common/settings/ClusterSettings.java | 2 + .../indices/IndicesRequestCache.java | 40 +- .../opensearch/indices/IndicesService.java | 3 +- .../indices/IndicesRequestCacheTests.java | 34 + 6 files changed, 785 insertions(+), 139 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fd6f72dcc2b70..2568cc52087d2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add an individual setting of rate limiter for segment replication ([#12959](https://github.com/opensearch-project/OpenSearch/pull/12959)) - [Streaming Indexing] Ensure support of the new transport by security plugin ([#13174](https://github.com/opensearch-project/OpenSearch/pull/13174)) - Add cluster setting to dynamically configure the buckets for filter rewrite optimization. ([#13179](https://github.com/opensearch-project/OpenSearch/pull/13179)) +- [Tiered caching] Make Indices Request Cache Stale Key Mgmt Threshold setting dynamic ([#12941](https://github.com/opensearch-project/OpenSearch/pull/12941)) - Make search query counters dynamic to support all query types ([#12601](https://github.com/opensearch-project/OpenSearch/pull/12601)) - [Tiered Caching] Gate new stats logic behind FeatureFlags.PLUGGABLE_CACHE ([#13238](https://github.com/opensearch-project/OpenSearch/pull/13238)) - [Tiered Caching] Add a dynamic setting to disable/enable disk cache. ([#13373](https://github.com/opensearch-project/OpenSearch/pull/13373)) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java index 52b4dad553180..bd7f2f0588c90 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java @@ -34,7 +34,11 @@ import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; +import org.opensearch.action.admin.cluster.node.stats.NodeStats; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.opensearch.action.admin.indices.alias.Alias; +import org.opensearch.action.admin.indices.cache.clear.ClearIndicesCacheRequest; import org.opensearch.action.admin.indices.forcemerge.ForceMergeResponse; import org.opensearch.action.search.SearchResponse; import org.opensearch.action.search.SearchType; @@ -42,13 +46,16 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; import org.opensearch.common.time.DateFormatter; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.FeatureFlags; +import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.cache.request.RequestCacheStats; import org.opensearch.index.query.QueryBuilders; import org.opensearch.search.aggregations.bucket.global.GlobalAggregationBuilder; import org.opensearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.opensearch.search.aggregations.bucket.histogram.Histogram; import org.opensearch.search.aggregations.bucket.histogram.Histogram.Bucket; +import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase; import org.opensearch.test.hamcrest.OpenSearchAssertions; @@ -59,7 +66,12 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.concurrent.TimeUnit; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; +import static org.opensearch.indices.IndicesRequestCache.INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING; +import static org.opensearch.indices.IndicesService.INDICES_CACHE_CLEANUP_INTERVAL_SETTING_KEY; import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING; import static org.opensearch.search.aggregations.AggregationBuilders.dateHistogram; import static org.opensearch.search.aggregations.AggregationBuilders.dateRange; @@ -69,6 +81,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, supportsDedicatedMasters = false) public class IndicesRequestCacheIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase { public IndicesRequestCacheIT(Settings settings) { super(settings); @@ -92,25 +105,31 @@ protected boolean useRandomReplicationStrategy() { // One of the primary purposes of the query cache is to cache aggs results public void testCacheAggs() throws Exception { Client client = client(); + String index = "index"; assertAcked( client.admin() .indices() - .prepareCreate("index") + .prepareCreate(index) .setMapping("f", "type=date") - .setSettings(Settings.builder().put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)) + .setSettings( + Settings.builder() + .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) + .put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_NUMBER_OF_REPLICAS, 0) + ) .get() ); indexRandom( true, - client.prepareIndex("index").setSource("f", "2014-03-10T00:00:00.000Z"), - client.prepareIndex("index").setSource("f", "2014-05-13T00:00:00.000Z") + client.prepareIndex(index).setSource("f", "2014-03-10T00:00:00.000Z"), + client.prepareIndex(index).setSource("f", "2014-05-13T00:00:00.000Z") ); - ensureSearchable("index"); + ensureSearchable(index); // This is not a random example: serialization with time zones writes shared strings // which used to not work well with the query cache because of the handles stream output // see #9500 - final SearchResponse r1 = client.prepareSearch("index") + final SearchResponse r1 = client.prepareSearch(index) .setSize(0) .setSearchType(SearchType.QUERY_THEN_FETCH) .addAggregation( @@ -124,12 +143,12 @@ public void testCacheAggs() throws Exception { // The cached is actually used assertThat( - client.admin().indices().prepareStats("index").setRequestCache(true).get().getTotal().getRequestCache().getMemorySizeInBytes(), + client.admin().indices().prepareStats(index).setRequestCache(true).get().getTotal().getRequestCache().getMemorySizeInBytes(), greaterThan(0L) ); for (int i = 0; i < 10; ++i) { - final SearchResponse r2 = client.prepareSearch("index") + final SearchResponse r2 = client.prepareSearch(index) .setSize(0) .setSearchType(SearchType.QUERY_THEN_FETCH) .addAggregation( @@ -156,10 +175,11 @@ public void testCacheAggs() throws Exception { public void testQueryRewrite() throws Exception { Client client = client(); + String index = "index"; assertAcked( client.admin() .indices() - .prepareCreate("index") + .prepareCreate(index) .setMapping("s", "type=date") .setSettings( Settings.builder() @@ -172,28 +192,28 @@ public void testQueryRewrite() throws Exception { ); indexRandom( true, - client.prepareIndex("index").setId("1").setRouting("1").setSource("s", "2016-03-19"), - client.prepareIndex("index").setId("2").setRouting("1").setSource("s", "2016-03-20"), - client.prepareIndex("index").setId("3").setRouting("1").setSource("s", "2016-03-21"), - client.prepareIndex("index").setId("4").setRouting("2").setSource("s", "2016-03-22"), - client.prepareIndex("index").setId("5").setRouting("2").setSource("s", "2016-03-23"), - client.prepareIndex("index").setId("6").setRouting("2").setSource("s", "2016-03-24"), - client.prepareIndex("index").setId("7").setRouting("3").setSource("s", "2016-03-25"), - client.prepareIndex("index").setId("8").setRouting("3").setSource("s", "2016-03-26"), - client.prepareIndex("index").setId("9").setRouting("3").setSource("s", "2016-03-27") + client.prepareIndex(index).setId("1").setRouting("1").setSource("s", "2016-03-19"), + client.prepareIndex(index).setId("2").setRouting("1").setSource("s", "2016-03-20"), + client.prepareIndex(index).setId("3").setRouting("1").setSource("s", "2016-03-21"), + client.prepareIndex(index).setId("4").setRouting("2").setSource("s", "2016-03-22"), + client.prepareIndex(index).setId("5").setRouting("2").setSource("s", "2016-03-23"), + client.prepareIndex(index).setId("6").setRouting("2").setSource("s", "2016-03-24"), + client.prepareIndex(index).setId("7").setRouting("3").setSource("s", "2016-03-25"), + client.prepareIndex(index).setId("8").setRouting("3").setSource("s", "2016-03-26"), + client.prepareIndex(index).setId("9").setRouting("3").setSource("s", "2016-03-27") ); - ensureSearchable("index"); - assertCacheState(client, "index", 0, 0); + ensureSearchable(index); + assertCacheState(client, index, 0, 0); // Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache - ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get(); + ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge(index).setFlush(true).get(); OpenSearchAssertions.assertAllSuccessful(forceMergeResponse); refreshAndWaitForReplication(); - ensureSearchable("index"); + ensureSearchable(index); - assertCacheState(client, "index", 0, 0); + assertCacheState(client, index, 0, 0); - final SearchResponse r1 = client.prepareSearch("index") + final SearchResponse r1 = client.prepareSearch(index) .setSearchType(SearchType.QUERY_THEN_FETCH) .setSize(0) .setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-19").lte("2016-03-25")) @@ -202,9 +222,9 @@ public void testQueryRewrite() throws Exception { .get(); OpenSearchAssertions.assertAllSuccessful(r1); assertThat(r1.getHits().getTotalHits().value, equalTo(7L)); - assertCacheState(client, "index", 0, 5); + assertCacheState(client, index, 0, 5); - final SearchResponse r2 = client.prepareSearch("index") + final SearchResponse r2 = client.prepareSearch(index) .setSearchType(SearchType.QUERY_THEN_FETCH) .setSize(0) .setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-20").lte("2016-03-26")) @@ -212,9 +232,9 @@ public void testQueryRewrite() throws Exception { .get(); OpenSearchAssertions.assertAllSuccessful(r2); assertThat(r2.getHits().getTotalHits().value, equalTo(7L)); - assertCacheState(client, "index", 3, 7); + assertCacheState(client, index, 3, 7); - final SearchResponse r3 = client.prepareSearch("index") + final SearchResponse r3 = client.prepareSearch(index) .setSearchType(SearchType.QUERY_THEN_FETCH) .setSize(0) .setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-21").lte("2016-03-27")) @@ -222,15 +242,16 @@ public void testQueryRewrite() throws Exception { .get(); OpenSearchAssertions.assertAllSuccessful(r3); assertThat(r3.getHits().getTotalHits().value, equalTo(7L)); - assertCacheState(client, "index", 6, 9); + assertCacheState(client, index, 6, 9); } public void testQueryRewriteMissingValues() throws Exception { Client client = client(); + String index = "index"; assertAcked( client.admin() .indices() - .prepareCreate("index") + .prepareCreate(index) .setMapping("s", "type=date") .setSettings( Settings.builder() @@ -242,61 +263,62 @@ public void testQueryRewriteMissingValues() throws Exception { ); indexRandom( true, - client.prepareIndex("index").setId("1").setSource("s", "2016-03-19"), - client.prepareIndex("index").setId("2").setSource("s", "2016-03-20"), - client.prepareIndex("index").setId("3").setSource("s", "2016-03-21"), - client.prepareIndex("index").setId("4").setSource("s", "2016-03-22"), - client.prepareIndex("index").setId("5").setSource("s", "2016-03-23"), - client.prepareIndex("index").setId("6").setSource("s", "2016-03-24"), - client.prepareIndex("index").setId("7").setSource("other", "value"), - client.prepareIndex("index").setId("8").setSource("s", "2016-03-26"), - client.prepareIndex("index").setId("9").setSource("s", "2016-03-27") + client.prepareIndex(index).setId("1").setSource("s", "2016-03-19"), + client.prepareIndex(index).setId("2").setSource("s", "2016-03-20"), + client.prepareIndex(index).setId("3").setSource("s", "2016-03-21"), + client.prepareIndex(index).setId("4").setSource("s", "2016-03-22"), + client.prepareIndex(index).setId("5").setSource("s", "2016-03-23"), + client.prepareIndex(index).setId("6").setSource("s", "2016-03-24"), + client.prepareIndex(index).setId("7").setSource("other", "value"), + client.prepareIndex(index).setId("8").setSource("s", "2016-03-26"), + client.prepareIndex(index).setId("9").setSource("s", "2016-03-27") ); - ensureSearchable("index"); - assertCacheState(client, "index", 0, 0); + ensureSearchable(index); + assertCacheState(client, index, 0, 0); // Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache - ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get(); + ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge(index).setFlush(true).get(); OpenSearchAssertions.assertAllSuccessful(forceMergeResponse); refreshAndWaitForReplication(); - ensureSearchable("index"); + ensureSearchable(index); - assertCacheState(client, "index", 0, 0); + assertCacheState(client, index, 0, 0); - final SearchResponse r1 = client.prepareSearch("index") + final SearchResponse r1 = client.prepareSearch(index) .setSearchType(SearchType.QUERY_THEN_FETCH) .setSize(0) .setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-19").lte("2016-03-28")) .get(); OpenSearchAssertions.assertAllSuccessful(r1); assertThat(r1.getHits().getTotalHits().value, equalTo(8L)); - assertCacheState(client, "index", 0, 1); + assertCacheState(client, index, 0, 1); - final SearchResponse r2 = client.prepareSearch("index") + final SearchResponse r2 = client.prepareSearch(index) .setSearchType(SearchType.QUERY_THEN_FETCH) .setSize(0) .setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-19").lte("2016-03-28")) .get(); OpenSearchAssertions.assertAllSuccessful(r2); assertThat(r2.getHits().getTotalHits().value, equalTo(8L)); - assertCacheState(client, "index", 1, 1); + assertCacheState(client, index, 1, 1); - final SearchResponse r3 = client.prepareSearch("index") + final SearchResponse r3 = client.prepareSearch(index) .setSearchType(SearchType.QUERY_THEN_FETCH) .setSize(0) .setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-19").lte("2016-03-28")) .get(); OpenSearchAssertions.assertAllSuccessful(r3); assertThat(r3.getHits().getTotalHits().value, equalTo(8L)); - assertCacheState(client, "index", 2, 1); + assertCacheState(client, index, 2, 1); } public void testQueryRewriteDates() throws Exception { Client client = client(); + String index = "index"; assertAcked( client.admin() .indices() - .prepareCreate("index") + .prepareCreate(index) .setMapping("d", "type=date") .setSettings( Settings.builder() @@ -308,28 +330,28 @@ public void testQueryRewriteDates() throws Exception { ); indexRandom( true, - client.prepareIndex("index").setId("1").setSource("d", "2014-01-01T00:00:00"), - client.prepareIndex("index").setId("2").setSource("d", "2014-02-01T00:00:00"), - client.prepareIndex("index").setId("3").setSource("d", "2014-03-01T00:00:00"), - client.prepareIndex("index").setId("4").setSource("d", "2014-04-01T00:00:00"), - client.prepareIndex("index").setId("5").setSource("d", "2014-05-01T00:00:00"), - client.prepareIndex("index").setId("6").setSource("d", "2014-06-01T00:00:00"), - client.prepareIndex("index").setId("7").setSource("d", "2014-07-01T00:00:00"), - client.prepareIndex("index").setId("8").setSource("d", "2014-08-01T00:00:00"), - client.prepareIndex("index").setId("9").setSource("d", "2014-09-01T00:00:00") + client.prepareIndex(index).setId("1").setSource("d", "2014-01-01T00:00:00"), + client.prepareIndex(index).setId("2").setSource("d", "2014-02-01T00:00:00"), + client.prepareIndex(index).setId("3").setSource("d", "2014-03-01T00:00:00"), + client.prepareIndex(index).setId("4").setSource("d", "2014-04-01T00:00:00"), + client.prepareIndex(index).setId("5").setSource("d", "2014-05-01T00:00:00"), + client.prepareIndex(index).setId("6").setSource("d", "2014-06-01T00:00:00"), + client.prepareIndex(index).setId("7").setSource("d", "2014-07-01T00:00:00"), + client.prepareIndex(index).setId("8").setSource("d", "2014-08-01T00:00:00"), + client.prepareIndex(index).setId("9").setSource("d", "2014-09-01T00:00:00") ); - ensureSearchable("index"); - assertCacheState(client, "index", 0, 0); + ensureSearchable(index); + assertCacheState(client, index, 0, 0); // Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache - ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get(); + ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge(index).setFlush(true).get(); OpenSearchAssertions.assertAllSuccessful(forceMergeResponse); refreshAndWaitForReplication(); - ensureSearchable("index"); + ensureSearchable(index); - assertCacheState(client, "index", 0, 0); + assertCacheState(client, index, 0, 0); - final SearchResponse r1 = client.prepareSearch("index") + final SearchResponse r1 = client.prepareSearch(index) .setSearchType(SearchType.QUERY_THEN_FETCH) .setSize(0) .setQuery(QueryBuilders.rangeQuery("d").gte("2013-01-01T00:00:00").lte("now")) @@ -338,9 +360,9 @@ public void testQueryRewriteDates() throws Exception { .get(); OpenSearchAssertions.assertAllSuccessful(r1); assertThat(r1.getHits().getTotalHits().value, equalTo(9L)); - assertCacheState(client, "index", 0, 1); + assertCacheState(client, index, 0, 1); - final SearchResponse r2 = client.prepareSearch("index") + final SearchResponse r2 = client.prepareSearch(index) .setSearchType(SearchType.QUERY_THEN_FETCH) .setSize(0) .setQuery(QueryBuilders.rangeQuery("d").gte("2013-01-01T00:00:00").lte("now")) @@ -348,9 +370,9 @@ public void testQueryRewriteDates() throws Exception { .get(); OpenSearchAssertions.assertAllSuccessful(r2); assertThat(r2.getHits().getTotalHits().value, equalTo(9L)); - assertCacheState(client, "index", 1, 1); + assertCacheState(client, index, 1, 1); - final SearchResponse r3 = client.prepareSearch("index") + final SearchResponse r3 = client.prepareSearch(index) .setSearchType(SearchType.QUERY_THEN_FETCH) .setSize(0) .setQuery(QueryBuilders.rangeQuery("d").gte("2013-01-01T00:00:00").lte("now")) @@ -358,7 +380,7 @@ public void testQueryRewriteDates() throws Exception { .get(); OpenSearchAssertions.assertAllSuccessful(r3); assertThat(r3.getHits().getTotalHits().value, equalTo(9L)); - assertCacheState(client, "index", 2, 1); + assertCacheState(client, index, 2, 1); } public void testQueryRewriteDatesWithNow() throws Exception { @@ -449,53 +471,54 @@ public void testCanCache() throws Exception { .put("index.number_of_routing_shards", 2) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) .build(); - assertAcked(client.admin().indices().prepareCreate("index").setMapping("s", "type=date").setSettings(settings).get()); + String index = "index"; + assertAcked(client.admin().indices().prepareCreate(index).setMapping("s", "type=date").setSettings(settings).get()); indexRandom( true, - client.prepareIndex("index").setId("1").setRouting("1").setSource("s", "2016-03-19"), - client.prepareIndex("index").setId("2").setRouting("1").setSource("s", "2016-03-20"), - client.prepareIndex("index").setId("3").setRouting("1").setSource("s", "2016-03-21"), - client.prepareIndex("index").setId("4").setRouting("2").setSource("s", "2016-03-22"), - client.prepareIndex("index").setId("5").setRouting("2").setSource("s", "2016-03-23"), - client.prepareIndex("index").setId("6").setRouting("2").setSource("s", "2016-03-24"), - client.prepareIndex("index").setId("7").setRouting("3").setSource("s", "2016-03-25"), - client.prepareIndex("index").setId("8").setRouting("3").setSource("s", "2016-03-26"), - client.prepareIndex("index").setId("9").setRouting("3").setSource("s", "2016-03-27") + client.prepareIndex(index).setId("1").setRouting("1").setSource("s", "2016-03-19"), + client.prepareIndex(index).setId("2").setRouting("1").setSource("s", "2016-03-20"), + client.prepareIndex(index).setId("3").setRouting("1").setSource("s", "2016-03-21"), + client.prepareIndex(index).setId("4").setRouting("2").setSource("s", "2016-03-22"), + client.prepareIndex(index).setId("5").setRouting("2").setSource("s", "2016-03-23"), + client.prepareIndex(index).setId("6").setRouting("2").setSource("s", "2016-03-24"), + client.prepareIndex(index).setId("7").setRouting("3").setSource("s", "2016-03-25"), + client.prepareIndex(index).setId("8").setRouting("3").setSource("s", "2016-03-26"), + client.prepareIndex(index).setId("9").setRouting("3").setSource("s", "2016-03-27") ); - ensureSearchable("index"); - assertCacheState(client, "index", 0, 0); + ensureSearchable(index); + assertCacheState(client, index, 0, 0); // Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache - ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get(); + ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge(index).setFlush(true).get(); OpenSearchAssertions.assertAllSuccessful(forceMergeResponse); refreshAndWaitForReplication(); - ensureSearchable("index"); + ensureSearchable(index); - assertCacheState(client, "index", 0, 0); + assertCacheState(client, index, 0, 0); // If size > 0 we should no cache by default - final SearchResponse r1 = client.prepareSearch("index") + final SearchResponse r1 = client.prepareSearch(index) .setSearchType(SearchType.QUERY_THEN_FETCH) .setSize(1) .setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-19").lte("2016-03-25")) .get(); OpenSearchAssertions.assertAllSuccessful(r1); assertThat(r1.getHits().getTotalHits().value, equalTo(7L)); - assertCacheState(client, "index", 0, 0); + assertCacheState(client, index, 0, 0); // If search type is DFS_QUERY_THEN_FETCH we should not cache - final SearchResponse r2 = client.prepareSearch("index") + final SearchResponse r2 = client.prepareSearch(index) .setSearchType(SearchType.DFS_QUERY_THEN_FETCH) .setSize(0) .setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-20").lte("2016-03-26")) .get(); OpenSearchAssertions.assertAllSuccessful(r2); assertThat(r2.getHits().getTotalHits().value, equalTo(7L)); - assertCacheState(client, "index", 0, 0); + assertCacheState(client, index, 0, 0); // If search type is DFS_QUERY_THEN_FETCH we should not cache even if // the cache flag is explicitly set on the request - final SearchResponse r3 = client.prepareSearch("index") + final SearchResponse r3 = client.prepareSearch(index) .setSearchType(SearchType.DFS_QUERY_THEN_FETCH) .setSize(0) .setRequestCache(true) @@ -503,10 +526,10 @@ public void testCanCache() throws Exception { .get(); OpenSearchAssertions.assertAllSuccessful(r3); assertThat(r3.getHits().getTotalHits().value, equalTo(7L)); - assertCacheState(client, "index", 0, 0); + assertCacheState(client, index, 0, 0); // If the request has an non-filter aggregation containing now we should not cache - final SearchResponse r5 = client.prepareSearch("index") + final SearchResponse r5 = client.prepareSearch(index) .setSearchType(SearchType.QUERY_THEN_FETCH) .setSize(0) .setRequestCache(true) @@ -515,10 +538,10 @@ public void testCanCache() throws Exception { .get(); OpenSearchAssertions.assertAllSuccessful(r5); assertThat(r5.getHits().getTotalHits().value, equalTo(7L)); - assertCacheState(client, "index", 0, 0); + assertCacheState(client, index, 0, 0); // If size > 1 and cache flag is set on the request we should cache - final SearchResponse r6 = client.prepareSearch("index") + final SearchResponse r6 = client.prepareSearch(index) .setSearchType(SearchType.QUERY_THEN_FETCH) .setSize(1) .setRequestCache(true) @@ -526,10 +549,10 @@ public void testCanCache() throws Exception { .get(); OpenSearchAssertions.assertAllSuccessful(r6); assertThat(r6.getHits().getTotalHits().value, equalTo(7L)); - assertCacheState(client, "index", 0, 2); + assertCacheState(client, index, 0, 2); // If the request has a filter aggregation containing now we should cache since it gets rewritten - final SearchResponse r4 = client.prepareSearch("index") + final SearchResponse r4 = client.prepareSearch(index) .setSearchType(SearchType.QUERY_THEN_FETCH) .setSize(0) .setRequestCache(true) @@ -538,7 +561,7 @@ public void testCanCache() throws Exception { .get(); OpenSearchAssertions.assertAllSuccessful(r4); assertThat(r4.getHits().getTotalHits().value, equalTo(7L)); - assertCacheState(client, "index", 0, 4); + assertCacheState(client, index, 0, 4); } public void testCacheWithFilteredAlias() throws InterruptedException { @@ -548,61 +571,63 @@ public void testCacheWithFilteredAlias() throws InterruptedException { .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) .build(); + String index = "index"; assertAcked( client.admin() .indices() - .prepareCreate("index") + .prepareCreate(index) .setMapping("created_at", "type=date") .setSettings(settings) .addAlias(new Alias("last_week").filter(QueryBuilders.rangeQuery("created_at").gte("now-7d/d"))) .get() ); ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC); - client.prepareIndex("index").setId("1").setRouting("1").setSource("created_at", DateTimeFormatter.ISO_LOCAL_DATE.format(now)).get(); + client.prepareIndex(index).setId("1").setRouting("1").setSource("created_at", DateTimeFormatter.ISO_LOCAL_DATE.format(now)).get(); // Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache - ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get(); + ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge(index).setFlush(true).get(); OpenSearchAssertions.assertAllSuccessful(forceMergeResponse); refreshAndWaitForReplication(); - indexRandomForConcurrentSearch("index"); + indexRandomForConcurrentSearch(index); - assertCacheState(client, "index", 0, 0); + assertCacheState(client, index, 0, 0); - SearchResponse r1 = client.prepareSearch("index") + SearchResponse r1 = client.prepareSearch(index) .setSearchType(SearchType.QUERY_THEN_FETCH) .setSize(0) .setQuery(QueryBuilders.rangeQuery("created_at").gte("now-7d/d")) .get(); OpenSearchAssertions.assertAllSuccessful(r1); assertThat(r1.getHits().getTotalHits().value, equalTo(1L)); - assertCacheState(client, "index", 0, 1); + assertCacheState(client, index, 0, 1); - r1 = client.prepareSearch("index") + r1 = client.prepareSearch(index) .setSearchType(SearchType.QUERY_THEN_FETCH) .setSize(0) .setQuery(QueryBuilders.rangeQuery("created_at").gte("now-7d/d")) .get(); OpenSearchAssertions.assertAllSuccessful(r1); assertThat(r1.getHits().getTotalHits().value, equalTo(1L)); - assertCacheState(client, "index", 1, 1); + assertCacheState(client, index, 1, 1); r1 = client.prepareSearch("last_week").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0).get(); OpenSearchAssertions.assertAllSuccessful(r1); assertThat(r1.getHits().getTotalHits().value, equalTo(1L)); - assertCacheState(client, "index", 1, 2); + assertCacheState(client, index, 1, 2); r1 = client.prepareSearch("last_week").setSearchType(SearchType.QUERY_THEN_FETCH).setSize(0).get(); OpenSearchAssertions.assertAllSuccessful(r1); assertThat(r1.getHits().getTotalHits().value, equalTo(1L)); - assertCacheState(client, "index", 2, 2); + assertCacheState(client, index, 2, 2); } public void testProfileDisableCache() throws Exception { Client client = client(); + String index = "index"; assertAcked( client.admin() .indices() - .prepareCreate("index") + .prepareCreate(index) .setMapping("k", "type=keyword") .setSettings( Settings.builder() @@ -612,14 +637,14 @@ public void testProfileDisableCache() throws Exception { ) .get() ); - indexRandom(true, client.prepareIndex("index").setSource("k", "hello")); - ensureSearchable("index"); + indexRandom(true, client.prepareIndex(index).setSource("k", "hello")); + ensureSearchable(index); int expectedHits = 0; int expectedMisses = 0; for (int i = 0; i < 5; i++) { boolean profile = i % 2 == 0; - SearchResponse resp = client.prepareSearch("index") + SearchResponse resp = client.prepareSearch(index) .setRequestCache(true) .setProfile(profile) .setQuery(QueryBuilders.termQuery("k", "hello")) @@ -634,16 +659,17 @@ public void testProfileDisableCache() throws Exception { expectedHits++; } } - assertCacheState(client, "index", expectedHits, expectedMisses); + assertCacheState(client, index, expectedHits, expectedMisses); } } public void testCacheWithInvalidation() throws Exception { Client client = client(); + String index = "index"; assertAcked( client.admin() .indices() - .prepareCreate("index") + .prepareCreate(index) .setMapping("k", "type=keyword") .setSettings( Settings.builder() @@ -653,38 +679,581 @@ public void testCacheWithInvalidation() throws Exception { ) .get() ); - indexRandom(true, client.prepareIndex("index").setSource("k", "hello")); - ensureSearchable("index"); - SearchResponse resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get(); + indexRandom(true, client.prepareIndex(index).setSource("k", "hello")); + ensureSearchable(index); + SearchResponse resp = client.prepareSearch(index).setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get(); assertSearchResponse(resp); OpenSearchAssertions.assertAllSuccessful(resp); assertThat(resp.getHits().getTotalHits().value, equalTo(1L)); - assertCacheState(client, "index", 0, 1); + assertCacheState(client, index, 0, 1); // Index but don't refresh - indexRandom(false, client.prepareIndex("index").setSource("k", "hello2")); - resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get(); + indexRandom(false, client.prepareIndex(index).setSource("k", "hello2")); + resp = client.prepareSearch(index).setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get(); assertSearchResponse(resp); // Should expect hit as here as refresh didn't happen - assertCacheState(client, "index", 1, 1); + assertCacheState(client, index, 1, 1); // Explicit refresh would invalidate cache refreshAndWaitForReplication(); // Hit same query again - resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get(); + resp = client.prepareSearch(index).setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get(); assertSearchResponse(resp); // Should expect miss as key has changed due to change in IndexReader.CacheKey (due to refresh) - assertCacheState(client, "index", 1, 2); + assertCacheState(client, index, 1, 2); + } + + // calling cache clear api, when staleness threshold is lower than staleness, it should clean the stale keys from cache + public void testCacheClearAPIRemovesStaleKeysWhenStalenessThresholdIsLow() throws Exception { + String node = internalCluster().startNode( + Settings.builder() + .put(IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_STALENESS_THRESHOLD_SETTING_KEY, 0.10) + .put( + IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_INTERVAL_SETTING_KEY, + // setting intentionally high to avoid cache cleaner interfering + TimeValue.timeValueMillis(300) + ) + ); + Client client = client(node); + String index1 = "index1"; + String index2 = "index2"; + setupIndex(client, index1); + setupIndex(client, index2); + + // create first cache entry in index1 + createCacheEntry(client, index1, "hello"); + assertCacheState(client, index1, 0, 1); + long memorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes(); + assertTrue(memorySizeForIndex1 > 0); + + // create second cache entry in index1 + createCacheEntry(client, index1, "there"); + assertCacheState(client, index1, 0, 2); + long finalMemorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes(); + assertTrue(finalMemorySizeForIndex1 > memorySizeForIndex1); + + // create first cache entry in index2 + createCacheEntry(client, index2, "hello"); + assertCacheState(client, index2, 0, 1); + assertTrue(getRequestCacheStats(client, index2).getMemorySizeInBytes() > 0); + + ClearIndicesCacheRequest clearIndicesCacheRequest = new ClearIndicesCacheRequest(index2); + client.admin().indices().clearCache(clearIndicesCacheRequest).actionGet(); + + // cache cleaner should have cleaned up the stale key from index 2 + assertEquals(0, getRequestCacheStats(client, index2).getMemorySizeInBytes()); + // cache cleaner should NOT have cleaned from index 1 + assertEquals(finalMemorySizeForIndex1, getRequestCacheStats(client, index1).getMemorySizeInBytes()); + } + + // when staleness threshold is lower than staleness, it should clean the stale keys from cache + public void testStaleKeysCleanupWithLowThreshold() throws Exception { + int cacheCleanIntervalInMillis = 1; + String node = internalCluster().startNode( + Settings.builder() + .put(IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_STALENESS_THRESHOLD_SETTING_KEY, 0.10) + .put( + IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_INTERVAL_SETTING_KEY, + TimeValue.timeValueMillis(cacheCleanIntervalInMillis) + ) + ); + Client client = client(node); + String index1 = "index1"; + String index2 = "index2"; + setupIndex(client, index1); + setupIndex(client, index2); + + // create first cache entry in index1 + createCacheEntry(client, index1, "hello"); + assertCacheState(client, index1, 0, 1); + long memorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes(); + assertTrue(memorySizeForIndex1 > 0); + + // create second cache entry in index1 + createCacheEntry(client, index1, "there"); + assertCacheState(client, index1, 0, 2); + long finalMemorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes(); + assertTrue(finalMemorySizeForIndex1 > memorySizeForIndex1); + + // create first cache entry in index2 + createCacheEntry(client, index2, "hello"); + assertCacheState(client, index2, 0, 1); + assertTrue(getRequestCacheStats(client, index2).getMemorySizeInBytes() > 0); + + // force refresh so that it creates 1 stale key + flushAndRefresh(index2); + // sleep until cache cleaner would have cleaned up the stale key from index 2 + assertBusy(() -> { + // cache cleaner should have cleaned up the stale key from index 2 + assertEquals(0, getRequestCacheStats(client, index2).getMemorySizeInBytes()); + // cache cleaner should NOT have cleaned from index 1 + assertEquals(finalMemorySizeForIndex1, getRequestCacheStats(client, index1).getMemorySizeInBytes()); + }, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS); + // sleep until cache cleaner would have cleaned up the stale key from index 2 + } + + // when staleness threshold is equal to staleness, it should clean the stale keys from cache + public void testCacheCleanupOnEqualStalenessAndThreshold() throws Exception { + int cacheCleanIntervalInMillis = 1; + String node = internalCluster().startNode( + Settings.builder() + .put(IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_STALENESS_THRESHOLD_SETTING_KEY, 0.33) + .put( + IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_INTERVAL_SETTING_KEY, + TimeValue.timeValueMillis(cacheCleanIntervalInMillis) + ) + ); + Client client = client(node); + String index1 = "index1"; + String index2 = "index2"; + setupIndex(client, index1); + setupIndex(client, index2); + + // create first cache entry in index1 + createCacheEntry(client, index1, "hello"); + assertCacheState(client, index1, 0, 1); + long memorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes(); + assertTrue(memorySizeForIndex1 > 0); + + // create second cache entry in index1 + createCacheEntry(client, index1, "there"); + assertCacheState(client, index1, 0, 2); + long finalMemorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes(); + assertTrue(finalMemorySizeForIndex1 > memorySizeForIndex1); + + // create first cache entry in index2 + createCacheEntry(client, index2, "hello"); + assertCacheState(client, index2, 0, 1); + assertTrue(getRequestCacheStats(client, index2).getMemorySizeInBytes() > 0); + + // force refresh so that it creates 1 stale key + flushAndRefresh(index2); + // sleep until cache cleaner would have cleaned up the stale key from index 2 + assertBusy(() -> { + // cache cleaner should have cleaned up the stale key from index 2 + assertEquals(0, getRequestCacheStats(client, index2).getMemorySizeInBytes()); + // cache cleaner should NOT have cleaned from index 1 + assertEquals(finalMemorySizeForIndex1, getRequestCacheStats(client, index1).getMemorySizeInBytes()); + }, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS); + } + + // when staleness threshold is higher than staleness, it should NOT clean the cache + public void testCacheCleanupSkipsWithHighStalenessThreshold() throws Exception { + int cacheCleanIntervalInMillis = 1; + String node = internalCluster().startNode( + Settings.builder() + .put(IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_STALENESS_THRESHOLD_SETTING_KEY, 0.90) + .put( + IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_INTERVAL_SETTING_KEY, + TimeValue.timeValueMillis(cacheCleanIntervalInMillis) + ) + ); + Client client = client(node); + String index1 = "index1"; + String index2 = "index2"; + setupIndex(client, index1); + setupIndex(client, index2); + + // create first cache entry in index1 + createCacheEntry(client, index1, "hello"); + assertCacheState(client, index1, 0, 1); + long memorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes(); + assertTrue(memorySizeForIndex1 > 0); + + // create second cache entry in index1 + createCacheEntry(client, index1, "there"); + assertCacheState(client, index1, 0, 2); + long finalMemorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes(); + assertTrue(finalMemorySizeForIndex1 > memorySizeForIndex1); + + // create first cache entry in index2 + createCacheEntry(client, index2, "hello"); + assertCacheState(client, index2, 0, 1); + assertTrue(getRequestCacheStats(client, index2).getMemorySizeInBytes() > 0); + + // force refresh so that it creates 1 stale key + flushAndRefresh(index2); + // sleep until cache cleaner would have cleaned up the stale key from index 2 + assertBusy(() -> { + // cache cleaner should NOT have cleaned up the stale key from index 2 + assertTrue(getRequestCacheStats(client, index2).getMemorySizeInBytes() > 0); + // cache cleaner should NOT have cleaned from index 1 + assertEquals(finalMemorySizeForIndex1, getRequestCacheStats(client, index1).getMemorySizeInBytes()); + }, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS); + } + + // when staleness threshold is explicitly set to 0, cache cleaner regularly cleans up stale keys. + public void testCacheCleanupOnZeroStalenessThreshold() throws Exception { + int cacheCleanIntervalInMillis = 50; + String node = internalCluster().startNode( + Settings.builder() + .put(IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_STALENESS_THRESHOLD_SETTING_KEY, 0) + .put( + IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_INTERVAL_SETTING_KEY, + TimeValue.timeValueMillis(cacheCleanIntervalInMillis) + ) + ); + Client client = client(node); + String index1 = "index1"; + String index2 = "index2"; + setupIndex(client, index1); + setupIndex(client, index2); + + // create 10 index1 cache entries + for (int i = 1; i <= 10; i++) { + long cacheSizeBefore = getRequestCacheStats(client, index1).getMemorySizeInBytes(); + createCacheEntry(client, index1, "hello" + i); + assertCacheState(client, index1, 0, i); + long cacheSizeAfter = getRequestCacheStats(client, index1).getMemorySizeInBytes(); + assertTrue(cacheSizeAfter > cacheSizeBefore); + } + + long finalMemorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes(); + + // create first cache entry in index2 + createCacheEntry(client, index2, "hello"); + assertCacheState(client, index2, 0, 1); + assertTrue(getRequestCacheStats(client, index2).getMemorySizeInBytes() > 0); + + // force refresh so that it creates 1 stale key + flushAndRefresh(index2); + // sleep until cache cleaner would have cleaned up the stale key from index 2 + assertBusy(() -> { + // cache cleaner should have cleaned up the stale key from index 2 + assertEquals(0, getRequestCacheStats(client, index2).getMemorySizeInBytes()); + // cache cleaner should NOT have cleaned from index 1 + assertEquals(finalMemorySizeForIndex1, getRequestCacheStats(client, index1).getMemorySizeInBytes()); + }, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS); + } + + // when staleness threshold is not explicitly set, cache cleaner regularly cleans up stale keys + public void testStaleKeysRemovalWithoutExplicitThreshold() throws Exception { + int cacheCleanIntervalInMillis = 1; + String node = internalCluster().startNode( + Settings.builder() + .put( + IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_INTERVAL_SETTING_KEY, + TimeValue.timeValueMillis(cacheCleanIntervalInMillis) + ) + ); + String index1 = "index1"; + String index2 = "index2"; + Client client = client(node); + setupIndex(client, index1); + setupIndex(client, index2); + + // create first cache entry in index1 + createCacheEntry(client, index1, "hello"); + assertCacheState(client, index1, 0, 1); + long memorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes(); + assertTrue(memorySizeForIndex1 > 0); + + // create second cache entry in index1 + createCacheEntry(client, index1, "there"); + assertCacheState(client, index1, 0, 2); + long finalMemorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes(); + assertTrue(finalMemorySizeForIndex1 > memorySizeForIndex1); + + // create first cache entry in index2 + createCacheEntry(client, index2, "hello"); + assertCacheState(client, index2, 0, 1); + assertTrue(getRequestCacheStats(client, index2).getMemorySizeInBytes() > 0); + + // force refresh so that it creates 1 stale key + flushAndRefresh(index2); + // sleep until cache cleaner would have cleaned up the stale key from index 2 + assertBusy(() -> { + // cache cleaner should have cleaned up the stale key from index 2 + assertEquals(0, getRequestCacheStats(client, index2).getMemorySizeInBytes()); + // cache cleaner should NOT have cleaned from index 1 + assertEquals(finalMemorySizeForIndex1, getRequestCacheStats(client, index1).getMemorySizeInBytes()); + }, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS); + } + + // when cache cleaner interval setting is not set, cache cleaner is configured appropriately with the fall-back setting + public void testCacheCleanupWithDefaultSettings() throws Exception { + int cacheCleanIntervalInMillis = 1; + String node = internalCluster().startNode( + Settings.builder().put(INDICES_CACHE_CLEANUP_INTERVAL_SETTING_KEY, TimeValue.timeValueMillis(cacheCleanIntervalInMillis)) + ); + Client client = client(node); + String index1 = "index1"; + String index2 = "index2"; + setupIndex(client, index1); + setupIndex(client, index2); + + // create first cache entry in index1 + createCacheEntry(client, index1, "hello"); + assertCacheState(client, index1, 0, 1); + long memorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes(); + assertTrue(memorySizeForIndex1 > 0); + + // create second cache entry in index1 + createCacheEntry(client, index1, "there"); + assertCacheState(client, index1, 0, 2); + long finalMemorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes(); + assertTrue(finalMemorySizeForIndex1 > memorySizeForIndex1); + + // create first cache entry in index2 + createCacheEntry(client, index2, "hello"); + assertCacheState(client, index2, 0, 1); + assertTrue(getRequestCacheStats(client, index2).getMemorySizeInBytes() > 0); + + // force refresh so that it creates 1 stale key + flushAndRefresh(index2); + // sleep until cache cleaner would have cleaned up the stale key from index 2 + assertBusy(() -> { + // cache cleaner should have cleaned up the stale key from index 2 + assertEquals(0, getRequestCacheStats(client, index2).getMemorySizeInBytes()); + // cache cleaner should NOT have cleaned from index 1 + assertEquals(finalMemorySizeForIndex1, getRequestCacheStats(client, index1).getMemorySizeInBytes()); + }, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS); + } + + // staleness threshold updates flows through to the cache cleaner + public void testDynamicStalenessThresholdUpdate() throws Exception { + int cacheCleanIntervalInMillis = 1; + String node = internalCluster().startNode( + Settings.builder() + .put(IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_STALENESS_THRESHOLD_SETTING_KEY, 0.90) + .put( + IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_INTERVAL_SETTING_KEY, + TimeValue.timeValueMillis(cacheCleanIntervalInMillis) + ) + ); + Client client = client(node); + String index1 = "index1"; + String index2 = "index2"; + setupIndex(client, index1); + setupIndex(client, index2); + + // create first cache entry in index1 + createCacheEntry(client, index1, "hello"); + assertCacheState(client, index1, 0, 1); + long memorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes(); + assertTrue(memorySizeForIndex1 > 0); + + // create second cache entry in index1 + createCacheEntry(client, index1, "there"); + assertCacheState(client, index1, 0, 2); + assertTrue(getRequestCacheStats(client, index1).getMemorySizeInBytes() > memorySizeForIndex1); + + // create first cache entry in index2 + createCacheEntry(client, index2, "hello"); + assertCacheState(client, index2, 0, 1); + long finalMemorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes(); + assertTrue(finalMemorySizeForIndex1 > 0); + + // force refresh so that it creates 1 stale key + flushAndRefresh(index2); + assertBusy(() -> { + // cache cleaner should NOT have cleaned up the stale key from index 2 + assertTrue(getRequestCacheStats(client, index2).getMemorySizeInBytes() > 0); + }, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS); + + // Update indices.requests.cache.cleanup.staleness_threshold to "10%" + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.persistentSettings(Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), 0.10)); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + assertBusy(() -> { + // cache cleaner should have cleaned up the stale key from index 2 + assertEquals(0, getRequestCacheStats(client, index2).getMemorySizeInBytes()); + // cache cleaner should NOT have cleaned from index 1 + assertEquals(finalMemorySizeForIndex1, getRequestCacheStats(client, index1).getMemorySizeInBytes()); + }, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS); + } + + // staleness threshold dynamic updates should throw exceptions on invalid input + public void testInvalidStalenessThresholdUpdateThrowsException() throws Exception { + int cacheCleanIntervalInMillis = 1; + String node = internalCluster().startNode( + Settings.builder() + .put(IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_STALENESS_THRESHOLD_SETTING_KEY, 0.90) + .put( + IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_INTERVAL_SETTING_KEY, + TimeValue.timeValueMillis(cacheCleanIntervalInMillis) + ) + ); + Client client = client(node); + String index1 = "index1"; + setupIndex(client, index1); + + // create first cache entry in index1 + createCacheEntry(client, index1, "hello"); + assertCacheState(client, index1, 0, 1); + assertTrue(getRequestCacheStats(client, index1).getMemorySizeInBytes() > 0); + + // Update indices.requests.cache.cleanup.staleness_threshold to "10%" with illegal argument + assertThrows("Ratio should be in [0-1.0]", IllegalArgumentException.class, () -> { + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.persistentSettings( + Settings.builder().put(IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_STALENESS_THRESHOLD_SETTING_KEY, 10) + ); + client().admin().cluster().updateSettings(updateSettingsRequest).actionGet(); + }); + + // everything else should continue to work fine later on. + // force refresh so that it creates 1 stale key + flushAndRefresh(index1); + // sleep until cache cleaner would have cleaned up the stale key from index 2 + assertBusy(() -> { + // cache cleaner should NOT have cleaned from index 1 + assertEquals(0, getRequestCacheStats(client, index1).getMemorySizeInBytes()); + }, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS); + } + + // closing the Index after caching will clean up from Indices Request Cache + public void testCacheClearanceAfterIndexClosure() throws Exception { + int cacheCleanIntervalInMillis = 100; + String node = internalCluster().startNode( + Settings.builder() + .put(IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_STALENESS_THRESHOLD_SETTING_KEY, 0.10) + .put( + IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_INTERVAL_SETTING_KEY, + TimeValue.timeValueMillis(cacheCleanIntervalInMillis) + ) + ); + Client client = client(node); + String index = "index"; + setupIndex(client, index); + + // create first cache entry in index + createCacheEntry(client, index, "hello"); + assertCacheState(client, index, 0, 1); + assertTrue(getRequestCacheStats(client, index).getMemorySizeInBytes() > 0); + assertTrue(getNodeCacheStats(client).getMemorySizeInBytes() > 0); + + // close index + assertAcked(client.admin().indices().prepareClose(index)); + // request cache stats cannot be access since Index should be closed + try { + getRequestCacheStats(client, index); + } catch (Exception e) { + assert (e instanceof IndexClosedException); + } + // sleep until cache cleaner would have cleaned up the stale key from index + assertBusy(() -> { + // cache cleaner should have cleaned up the stale keys from index + assertFalse(getNodeCacheStats(client).getMemorySizeInBytes() > 0); + }, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS); + } + + // deleting the Index after caching will clean up from Indices Request Cache + public void testCacheCleanupAfterIndexDeletion() throws Exception { + int cacheCleanIntervalInMillis = 100; + String node = internalCluster().startNode( + Settings.builder() + .put(IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_STALENESS_THRESHOLD_SETTING_KEY, 0.10) + .put( + IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_INTERVAL_SETTING_KEY, + TimeValue.timeValueMillis(cacheCleanIntervalInMillis) + ) + ); + Client client = client(node); + String index = "index"; + setupIndex(client, index); + + // create first cache entry in index + createCacheEntry(client, index, "hello"); + assertCacheState(client, index, 0, 1); + assertTrue(getRequestCacheStats(client, index).getMemorySizeInBytes() > 0); + assertTrue(getNodeCacheStats(client).getMemorySizeInBytes() > 0); + + // delete index + assertAcked(client.admin().indices().prepareDelete(index)); + // request cache stats cannot be access since Index should be deleted + try { + getRequestCacheStats(client, index); + } catch (Exception e) { + assert (e instanceof IndexNotFoundException); + } + + // sleep until cache cleaner would have cleaned up the stale key from index + assertBusy(() -> { + // cache cleaner should have cleaned up the stale keys from index + assertFalse(getNodeCacheStats(client).getMemorySizeInBytes() > 0); + }, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS); + } + + // when staleness threshold is lower than staleness, it should clean the cache from all indices having stale keys + public void testStaleKeysCleanupWithMultipleIndices() throws Exception { + int cacheCleanIntervalInMillis = 300; + String node = internalCluster().startNode( + Settings.builder() + .put(IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_STALENESS_THRESHOLD_SETTING_KEY, 0.10) + .put( + IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_INTERVAL_SETTING_KEY, + TimeValue.timeValueMillis(cacheCleanIntervalInMillis) + ) + ); + Client client = client(node); + String index1 = "index1"; + String index2 = "index2"; + setupIndex(client, index1); + setupIndex(client, index2); + + // create first cache entry in index1 + createCacheEntry(client, index1, "hello"); + assertCacheState(client, index1, 0, 1); + long memorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes(); + assertTrue(memorySizeForIndex1 > 0); + + // create second cache entry in index1 + createCacheEntry(client, index1, "there"); + assertCacheState(client, index1, 0, 2); + long finalMemorySizeForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes(); + assertTrue(finalMemorySizeForIndex1 > memorySizeForIndex1); + + // create first cache entry in index2 + createCacheEntry(client, index2, "hello"); + assertCacheState(client, index2, 0, 1); + assertTrue(getRequestCacheStats(client, index2).getMemorySizeInBytes() > 0); + + // force refresh index 1 so that it creates 2 stale keys + flushAndRefresh(index1); + // create another cache entry in index 1, this should not be cleaned up. + createCacheEntry(client, index1, "hello"); + // record the size of this entry + long memorySizeOfLatestEntryForIndex1 = getRequestCacheStats(client, index1).getMemorySizeInBytes() - finalMemorySizeForIndex1; + // force refresh index 2 so that it creates 1 stale key + flushAndRefresh(index2); + // sleep until cache cleaner would have cleaned up the stale key from index 2 + assertBusy(() -> { + // cache cleaner should have cleaned up the stale key from index 2 + assertEquals(0, getRequestCacheStats(client, index2).getMemorySizeInBytes()); + // cache cleaner should have only cleaned up the stale entities + assertEquals(memorySizeOfLatestEntryForIndex1, getRequestCacheStats(client, index1).getMemorySizeInBytes()); + }, cacheCleanIntervalInMillis * 2, TimeUnit.MILLISECONDS); + } + + private void setupIndex(Client client, String index) throws Exception { + assertAcked( + client.admin() + .indices() + .prepareCreate(index) + .setMapping("k", "type=keyword") + .setSettings( + Settings.builder() + .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + ) + .get() + ); + indexRandom(true, client.prepareIndex(index).setSource("k", "hello")); + indexRandom(true, client.prepareIndex(index).setSource("k", "there")); + ensureSearchable(index); + } + + private void createCacheEntry(Client client, String index, String value) { + SearchResponse resp = client.prepareSearch(index).setRequestCache(true).setQuery(QueryBuilders.termQuery("k", value)).get(); + assertSearchResponse(resp); + OpenSearchAssertions.assertAllSuccessful(resp); } private static void assertCacheState(Client client, String index, long expectedHits, long expectedMisses) { - RequestCacheStats requestCacheStats = client.admin() - .indices() - .prepareStats(index) - .setRequestCache(true) - .get() - .getTotal() - .getRequestCache(); + RequestCacheStats requestCacheStats = getRequestCacheStats(client, index); // Check the hit count and miss count together so if they are not // correct we can see both values assertEquals( @@ -694,4 +1263,17 @@ private static void assertCacheState(Client client, String index, long expectedH } + private static RequestCacheStats getRequestCacheStats(Client client, String index) { + return client.admin().indices().prepareStats(index).setRequestCache(true).get().getTotal().getRequestCache(); + } + + private static RequestCacheStats getNodeCacheStats(Client client) { + NodesStatsResponse stats = client.admin().cluster().prepareNodesStats().execute().actionGet(); + for (NodeStats stat : stats.getNodes()) { + if (stat.getNode().isDataNode()) { + return stat.getIndices().getRequestCache(); + } + } + return null; + } } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 2140fc59f35cd..0485374affbda 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -498,6 +498,8 @@ public void apply(Settings value, Settings current, Settings previous) { IndicesFieldDataCache.INDICES_FIELDDATA_CACHE_SIZE_KEY, IndicesRequestCache.INDICES_CACHE_QUERY_SIZE, IndicesRequestCache.INDICES_CACHE_QUERY_EXPIRE, + IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_INTERVAL_SETTING, + IndicesRequestCache.INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING, HunspellService.HUNSPELL_LAZY_LOAD, HunspellService.HUNSPELL_IGNORE_CASE, HunspellService.HUNSPELL_DICTIONARY_OPTIONS, diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 362d6684c2835..d5651cc01e654 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -112,6 +112,10 @@ public final class IndicesRequestCache implements RemovalListener INDEX_CACHE_REQUEST_ENABLED_SETTING = Setting.boolSetting( "index.requests.cache.enable", true, @@ -128,15 +132,16 @@ public final class IndicesRequestCache implements RemovalListener INDICES_REQUEST_CACHE_CLEAN_INTERVAL_SETTING = Setting.positiveTimeSetting( - "indices.requests.cache.cleanup.interval", + public static final Setting INDICES_REQUEST_CACHE_CLEANUP_INTERVAL_SETTING = Setting.positiveTimeSetting( + INDICES_REQUEST_CACHE_CLEANUP_INTERVAL_SETTING_KEY, INDICES_CACHE_CLEAN_INTERVAL_SETTING, Property.NodeScope ); public static final Setting INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING = new Setting<>( - "indices.requests.cache.cleanup.staleness_threshold", + INDICES_REQUEST_CACHE_CLEANUP_STALENESS_THRESHOLD_SETTING_KEY, "0%", IndicesRequestCache::validateStalenessSetting, + Property.Dynamic, Property.NodeScope ); @@ -146,6 +151,7 @@ public final class IndicesRequestCache implements RemovalListener cache; + private final ClusterService clusterService; private final Function> cacheEntityLookup; // pkg-private for testing final IndicesRequestCacheCleanupManager cacheCleanupManager; @@ -167,10 +173,13 @@ public final class IndicesRequestCache implements RemovalListener, BytesReference> weigher = (k, v) -> k.ramBytesUsed(k.key.ramBytesUsed()) + v.ramBytesUsed(); this.cacheCleanupManager = new IndicesRequestCacheCleanupManager( threadPool, - INDICES_REQUEST_CACHE_CLEAN_INTERVAL_SETTING.get(settings), + INDICES_REQUEST_CACHE_CLEANUP_INTERVAL_SETTING.get(settings), getStalenessThreshold(settings) ); this.cacheEntityLookup = cacheEntityFunction; + this.clusterService = clusterService; + this.clusterService.getClusterSettings() + .addSettingsUpdateConsumer(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING, this::setStalenessThreshold); this.cache = cacheService.createCache( new CacheConfig.Builder().setSettings(settings) .setWeigher(weigher) @@ -208,6 +217,11 @@ private double getStalenessThreshold(Settings settings) { return RatioValue.parseRatioValue(threshold).getAsRatio(); } + // pkg-private for testing + void setStalenessThreshold(String threshold) { + this.cacheCleanupManager.updateStalenessThreshold(RatioValue.parseRatioValue(threshold).getAsRatio()); + } + void clear(CacheEntity entity) { cacheCleanupManager.enqueueCleanupKey(new CleanupKey(entity, null)); cacheCleanupManager.forceCleanCache(); @@ -473,7 +487,7 @@ class IndicesRequestCacheCleanupManager implements Closeable { private final Set keysToClean; private final ConcurrentMap> cleanupKeyToCountMap; private final AtomicInteger staleKeysCount; - private final double stalenessThreshold; + private volatile double stalenessThreshold; private final IndicesRequestCacheCleaner cacheCleaner; IndicesRequestCacheCleanupManager(ThreadPool threadpool, TimeValue cleanInterval, double stalenessThreshold) { @@ -485,6 +499,18 @@ class IndicesRequestCacheCleanupManager implements Closeable { threadpool.schedule(cacheCleaner, cleanInterval, ThreadPool.Names.SAME); } + void updateStalenessThreshold(double stalenessThreshold) { + double oldStalenessThreshold = this.stalenessThreshold; + this.stalenessThreshold = stalenessThreshold; + if (logger.isDebugEnabled()) { + logger.debug( + "Staleness threshold for indices request cache changed to {} from {}", + this.stalenessThreshold, + oldStalenessThreshold + ); + } + } + /** * Enqueue cleanup key. * @@ -508,7 +534,7 @@ void enqueueCleanupKey(CleanupKey cleanupKey) { * @param cleanupKey the CleanupKey to be updated in the map */ private void updateStaleCountOnCacheInsert(CleanupKey cleanupKey) { - if (stalenessThreshold == 0.0 || cleanupKey.entity == null) { + if (cleanupKey.entity == null) { return; } IndexShard indexShard = (IndexShard) cleanupKey.entity.getCacheIdentity(); @@ -596,7 +622,7 @@ private void updateStaleCountOnEntryRemoval(CleanupKey cleanupKey, RemovalNotifi * @param cleanupKey the CleanupKey that has been marked for cleanup */ private void incrementStaleKeysCount(CleanupKey cleanupKey) { - if (stalenessThreshold == 0.0 || cleanupKey.entity == null) { + if (cleanupKey.entity == null) { return; } IndexShard indexShard = (IndexShard) cleanupKey.entity.getCacheIdentity(); diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 0b607160ac4f5..ce8d23db87c30 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -218,10 +218,11 @@ public class IndicesService extends AbstractLifecycleComponent IndicesClusterStateService.AllocatedIndices, IndexService.ShardStoreDeleter { private static final Logger logger = LogManager.getLogger(IndicesService.class); + public static final String INDICES_CACHE_CLEANUP_INTERVAL_SETTING_KEY = "indices.cache.cleanup_interval"; public static final String INDICES_SHARDS_CLOSED_TIMEOUT = "indices.shards_closed_timeout"; public static final Setting INDICES_CACHE_CLEAN_INTERVAL_SETTING = Setting.positiveTimeSetting( - "indices.cache.cleanup_interval", + INDICES_CACHE_CLEANUP_INTERVAL_SETTING_KEY, TimeValue.timeValueMinutes(1), Property.NodeScope ); diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index 124e8f41289ce..fc306f7c595d6 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -912,6 +912,40 @@ public void testClosingIndexWipesStats() throws Exception { IOUtils.close(secondReader); } + public void testCacheCleanupBasedOnStaleThreshold_thresholdUpdate() throws Exception { + threadPool = getThreadPool(); + Settings settings = Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), "51%").build(); + cache = getIndicesRequestCache(settings); + + writer.addDocument(newDoc(0, "foo")); + DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); + + // Get 2 entries into the cache + cache.getOrCompute(getEntity(indexShard), getLoader(reader), reader, getTermBytes()); + cache.getOrCompute(getEntity(indexShard), getLoader(secondReader), secondReader, getTermBytes()); + assertEquals(2, cache.count()); + + // Close the reader, to be enqueued for cleanup + // 1 out of 2 keys ie 50% are now stale. + reader.close(); + // cache count should not be affected + assertEquals(2, cache.count()); + + // clean cache with 51% staleness threshold + cache.cacheCleanupManager.cleanCache(); + // cleanup should have been ignored + assertEquals(2, cache.count()); + + cache.setStalenessThreshold("49%"); + // clean cache with 49% staleness threshold + cache.cacheCleanupManager.cleanCache(); + // cleanup should NOT have been ignored + assertEquals(1, cache.count()); + + IOUtils.close(secondReader); + } + public void testEviction() throws Exception { final ByteSizeValue size; {