diff --git a/CHANGELOG.md b/CHANGELOG.md index fed2bb54783a7..b638b63dd52a0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -101,8 +101,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add search query categorizer ([#10255](https://github.com/opensearch-project/OpenSearch/pull/10255)) - Per request phase latency ([#10351](https://github.com/opensearch-project/OpenSearch/issues/10351)) - Add cluster state stats ([#10670](https://github.com/opensearch-project/OpenSearch/pull/10670)) -- [Tiered caching] Defining interfaces, listeners and extending IndicesRequestCache with Tiered cache support ([#10753] - (https://github.com/opensearch-project/OpenSearch/pull/10753)) +- [Tiered caching] Enabling serialization for IndicesRequestCache key object ([#10275](https://github.com/opensearch-project/OpenSearch/pull/10275)) +- [Tiered caching] Defining interfaces, listeners and extending IndicesRequestCache with Tiered cache support ([#10753](https://github.com/opensearch-project/OpenSearch/pull/10753)) - [Remote cluster state] Restore cluster state version during remote state auto restore ([#10853](https://github.com/opensearch-project/OpenSearch/pull/10853)) - Update the indexRandom function to create more segments for concurrent search tests ([10247](https://github.com/opensearch-project/OpenSearch/pull/10247)) - Add support for query profiler with concurrent aggregation ([#9248](https://github.com/opensearch-project/OpenSearch/pull/9248)) @@ -192,6 +192,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Restore support for Java 8 for RestClient ([#11562](https://github.com/opensearch-project/OpenSearch/pull/11562)) - Add deleted doc count in _cat/shards ([#11678](https://github.com/opensearch-project/OpenSearch/pull/11678)) - Capture information for additional query types and aggregation types ([#11582](https://github.com/opensearch-project/OpenSearch/pull/11582)) +- Use slice_size == shard_size heuristic in terms aggs for concurrent segment search and properly calculate the doc_count_error ([#11732](https://github.com/opensearch-project/OpenSearch/pull/11732)) ### Deprecated diff --git a/DEVELOPER_GUIDE.md b/DEVELOPER_GUIDE.md index f9936aad0cf8c..21adbb0305ab1 100644 --- a/DEVELOPER_GUIDE.md +++ b/DEVELOPER_GUIDE.md @@ -183,6 +183,12 @@ Run OpenSearch using `gradlew run`. ./gradlew run ``` +[Plugins](plugins/) may be installed by passing a `-PinstalledPlugins` property: + +```bash +./gradlew run -PinstalledPlugins="['plugin1', 'plugin2']" +``` + That will build OpenSearch and start it, writing its log above Gradle's status message. We log a lot of stuff on startup, specifically these lines tell you that OpenSearch is ready. ``` @@ -578,7 +584,7 @@ explicitly marked by an annotation should not be extended by external implementa any time. The `@DeprecatedApi` annotation could also be added to any classes annotated with `@PublicApi` (or documented as `@opensearch.api`) or their methods that are either changed (with replacement) or planned to be removed across major versions. -The APIs which are designated to be public but have not been stabilized yet should be marked with `@ExperimentalApi` (or documented as `@opensearch.experimental`) +The APIs which are designated to be public but have not been stabilized yet should be marked with `@ExperimentalApi` (or documented as `@opensearch.experimental`) annotation. The presence of this annotation signals that API may change at any time (major, minor or even patch releases). In general, the classes annotated with `@PublicApi` may expose other classes or methods annotated with `@ExperimentalApi`, in such cases the backward compatibility guarantees would not apply to latter (see please [Experimental Development](#experimental-development) for more details). diff --git a/build.gradle b/build.gradle index b1cd1d532bfeb..296c30391af09 100644 --- a/build.gradle +++ b/build.gradle @@ -54,7 +54,7 @@ plugins { id 'lifecycle-base' id 'opensearch.docker-support' id 'opensearch.global-build-info' - id "com.diffplug.spotless" version "6.20.0" apply false + id "com.diffplug.spotless" version "6.23.2" apply false id "org.gradle.test-retry" version "1.5.4" apply false id "test-report-aggregation" id 'jacoco-report-aggregation' diff --git a/gradle/run.gradle b/gradle/run.gradle index 639479e97d28f..34651f1d94964 100644 --- a/gradle/run.gradle +++ b/gradle/run.gradle @@ -39,6 +39,12 @@ testClusters { testDistribution = 'archive' if (numZones > 1) numberOfZones = numZones if (numNodes > 1) numberOfNodes = numNodes + if (findProperty("installedPlugins")) { + installedPlugins = Eval.me(installedPlugins) + for (String p : installedPlugins) { + plugin('plugins:'.concat(p)) + } + } } } diff --git a/libs/core/src/main/java/org/opensearch/core/index/shard/ShardId.java b/libs/core/src/main/java/org/opensearch/core/index/shard/ShardId.java index c0abad7ed727f..1e48cf1f476da 100644 --- a/libs/core/src/main/java/org/opensearch/core/index/shard/ShardId.java +++ b/libs/core/src/main/java/org/opensearch/core/index/shard/ShardId.java @@ -32,6 +32,7 @@ package org.opensearch.core.index.shard; +import org.apache.lucene.util.RamUsageEstimator; import org.opensearch.common.annotation.PublicApi; import org.opensearch.core.common.Strings; import org.opensearch.core.common.io.stream.StreamInput; @@ -55,6 +56,8 @@ public class ShardId implements Comparable, ToXContentFragment, Writeab private final int shardId; private final int hashCode; + private final static long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(ShardId.class); + /** * Constructs a new shard id. * @param index the index name @@ -88,6 +91,10 @@ public ShardId(StreamInput in) throws IOException { hashCode = computeHashCode(); } + public long getBaseRamBytesUsed() { + return BASE_RAM_BYTES_USED; + } + /** * Writes this shard id to a stream. * @param out the stream to write to diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java index 848f6eddbb0df..51dba07a8f9f8 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java @@ -636,6 +636,45 @@ public void testProfileDisableCache() throws Exception { } } + public void testCacheWithInvalidation() throws Exception { + Client client = client(); + assertAcked( + client.admin() + .indices() + .prepareCreate("index") + .setMapping("k", "type=keyword") + .setSettings( + Settings.builder() + .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + ) + .get() + ); + indexRandom(true, client.prepareIndex("index").setSource("k", "hello")); + ensureSearchable("index"); + SearchResponse resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get(); + assertSearchResponse(resp); + OpenSearchAssertions.assertAllSuccessful(resp); + assertThat(resp.getHits().getTotalHits().value, equalTo(1L)); + + assertCacheState(client, "index", 0, 1); + // Index but don't refresh + indexRandom(false, client.prepareIndex("index").setSource("k", "hello2")); + resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get(); + assertSearchResponse(resp); + // Should expect hit as here as refresh didn't happen + assertCacheState(client, "index", 1, 1); + + // Explicit refresh would invalidate cache + refresh(); + // Hit same query again + resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get(); + assertSearchResponse(resp); + // Should expect miss as key has changed due to change in IndexReader.CacheKey (due to refresh) + assertCacheState(client, "index", 1, 2); + } + private static void assertCacheState(Client client, String index, long expectedHits, long expectedMisses) { RequestCacheStats requestCacheStats = client.admin() .indices() @@ -650,6 +689,7 @@ private static void assertCacheState(Client client, String index, long expectedH Arrays.asList(expectedHits, expectedMisses, 0L), Arrays.asList(requestCacheStats.getHitCount(), requestCacheStats.getMissCount(), requestCacheStats.getEvictions()) ); + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/ShardSizeTermsIT.java b/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/ShardSizeTermsIT.java index 145830f02ee56..7c7cc12888307 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/ShardSizeTermsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/ShardSizeTermsIT.java @@ -86,6 +86,7 @@ public void testShardSizeEqualsSizeString() throws Exception { terms("keys").field("key") .size(3) .shardSize(3) + .showTermDocCountError(true) .collectMode(randomFrom(SubAggCollectionMode.values())) .order(BucketOrder.count(false)) ) @@ -98,8 +99,11 @@ public void testShardSizeEqualsSizeString() throws Exception { expected.put("1", 8L); expected.put("3", 8L); expected.put("2", 4L); + Long expectedDocCount; for (Terms.Bucket bucket : buckets) { - assertThat(bucket.getDocCount(), equalTo(expected.get(bucket.getKeyAsString()))); + expectedDocCount = expected.get(bucket.getKeyAsString()); + // Doc count can vary when using concurrent segment search. See https://github.com/opensearch-project/OpenSearch/issues/11680 + assertTrue((bucket.getDocCount() == expectedDocCount) || bucket.getDocCount() + bucket.getDocCountError() >= expectedDocCount); } } @@ -221,6 +225,7 @@ public void testShardSizeEqualsSizeLong() throws Exception { terms("keys").field("key") .size(3) .shardSize(3) + .showTermDocCountError(true) .collectMode(randomFrom(SubAggCollectionMode.values())) .order(BucketOrder.count(false)) ) @@ -233,8 +238,11 @@ public void testShardSizeEqualsSizeLong() throws Exception { expected.put(1, 8L); expected.put(3, 8L); expected.put(2, 4L); + Long expectedDocCount; for (Terms.Bucket bucket : buckets) { - assertThat(bucket.getDocCount(), equalTo(expected.get(bucket.getKeyAsNumber().intValue()))); + expectedDocCount = expected.get(bucket.getKeyAsNumber().intValue()); + // Doc count can vary when using concurrent segment search. See https://github.com/opensearch-project/OpenSearch/issues/11680 + assertTrue((bucket.getDocCount() == expectedDocCount) || bucket.getDocCount() + bucket.getDocCountError() >= expectedDocCount); } } @@ -355,6 +363,7 @@ public void testShardSizeEqualsSizeDouble() throws Exception { terms("keys").field("key") .size(3) .shardSize(3) + .showTermDocCountError(true) .collectMode(randomFrom(SubAggCollectionMode.values())) .order(BucketOrder.count(false)) ) @@ -367,8 +376,11 @@ public void testShardSizeEqualsSizeDouble() throws Exception { expected.put(1, 8L); expected.put(3, 8L); expected.put(2, 4L); + Long expectedDocCount; for (Terms.Bucket bucket : buckets) { - assertThat(bucket.getDocCount(), equalTo(expected.get(bucket.getKeyAsNumber().intValue()))); + expectedDocCount = expected.get(bucket.getKeyAsNumber().intValue()); + // Doc count can vary when using concurrent segment search. See https://github.com/opensearch-project/OpenSearch/issues/11680 + assertTrue((bucket.getDocCount() == expectedDocCount) || bucket.getDocCount() + bucket.getDocCountError() >= expectedDocCount); } } diff --git a/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/TermsDocCountErrorIT.java b/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/TermsDocCountErrorIT.java index b355ce6d7a8dd..343cea4b94c87 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/TermsDocCountErrorIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/TermsDocCountErrorIT.java @@ -225,8 +225,16 @@ public void setupSuiteScopeCluster() throws Exception { } indexRandom(true, builders); - indexRandomForMultipleSlices("idx"); ensureSearchable(); + + // Force merge each shard down to 1 segment to verify results are the same between concurrent and non-concurrent search paths, else + // for concurrent segment search there will be additional error introduced during the slice level reduce and thus different buckets, + // doc_counts, and doc_count_errors may be returned. This test serves to verify that the doc_count_error is the same between + // concurrent and non-concurrent search in the 1 slice case. TermsFixedDocCountErrorIT verifies that the doc count error is + // correctly calculated for concurrent segment search at the slice level. + // See https://github.com/opensearch-project/OpenSearch/issues/11680" + forceMerge(1); + Thread.sleep(5000); // Sleep 5s to ensure force merge completes } private void assertDocCountErrorWithinBounds(int size, SearchResponse accurateResponse, SearchResponse testResponse) { diff --git a/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/TermsFixedDocCountErrorIT.java b/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/TermsFixedDocCountErrorIT.java new file mode 100644 index 0000000000000..5ad913e8c7086 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/TermsFixedDocCountErrorIT.java @@ -0,0 +1,347 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.aggregations.bucket; + +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; + +import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.search.aggregations.bucket.terms.Terms; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.ParameterizedOpenSearchIntegTestCase; + +import java.util.Arrays; +import java.util.Collection; + +import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING; +import static org.opensearch.search.aggregations.AggregationBuilders.terms; +import static org.opensearch.test.OpenSearchIntegTestCase.Scope.TEST; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +@OpenSearchIntegTestCase.ClusterScope(scope = TEST, numClientNodes = 0, maxNumDataNodes = 1, supportsDedicatedMasters = false) +public class TermsFixedDocCountErrorIT extends ParameterizedOpenSearchIntegTestCase { + + private static final String STRING_FIELD_NAME = "s_value"; + + public TermsFixedDocCountErrorIT(Settings dynamicSettings) { + super(dynamicSettings); + } + + @ParametersFactory + public static Collection parameters() { + return Arrays.asList( + new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() }, + new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() } + ); + } + + @Override + protected Settings featureFlagSettings() { + return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.CONCURRENT_SEGMENT_SEARCH, "true").build(); + } + + public void testSimpleAggErrorMultiShard() throws Exception { + // size = 1, shard_size = 2 + // Shard_1 [A, A, A, A, B, B, C, C, D, D] -> Buckets {"A" : 4, "B" : 2} + // Shard_2 [A, B, B, B, C, C, C, D, D, D] -> Buckets {"B" : 3, "C" : 3} + // coordinator -> Buckets {"B" : 5, "A" : 4} + // Agg error is 4, from (shard_size)th bucket on each shard + // Bucket "A" error is 2, from (shard_size)th bucket on shard_2 + // Bucket "B" error is 0, it's present on both shards + + // size = 1 shard_size = 1 slice_size = 1 + // non-cs / cs + // Shard_1 [A, B, C] + // Shard_2 [B, C, D] + // cs + // Shard_1 slice_1 [A, B, C] -> {a : 1} -> {a : 1 -- error: 1} + // slice_2 [B, C, D] -> {b : 1} + // Coordinator should return the same doc count error in both cases + + assertAcked( + prepareCreate("idx_mshard_1").setMapping(STRING_FIELD_NAME, "type=keyword") + .setSettings( + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + ) + ); + client().prepareIndex("idx_mshard_1").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "A").endObject()).get(); + client().prepareIndex("idx_mshard_1").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "A").endObject()).get(); + client().prepareIndex("idx_mshard_1").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "A").endObject()).get(); + client().prepareIndex("idx_mshard_1").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "A").endObject()).get(); + client().prepareIndex("idx_mshard_1").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "B").endObject()).get(); + client().prepareIndex("idx_mshard_1").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "B").endObject()).get(); + client().prepareIndex("idx_mshard_1").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "C").endObject()).get(); + refresh("idx_mshard_1"); + + IndicesSegmentResponse segmentResponse = client().admin().indices().prepareSegments("idx_mshard_1").get(); + assertEquals(1, segmentResponse.getIndices().get("idx_mshard_1").getShards().get(0).getShards()[0].getSegments().size()); + + assertAcked( + prepareCreate("idx_mshard_2").setMapping(STRING_FIELD_NAME, "type=keyword") + .setSettings( + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + ) + ); + client().prepareIndex("idx_mshard_2").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "A").endObject()).get(); + client().prepareIndex("idx_mshard_2").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "B").endObject()).get(); + client().prepareIndex("idx_mshard_2").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "B").endObject()).get(); + client().prepareIndex("idx_mshard_2").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "B").endObject()).get(); + client().prepareIndex("idx_mshard_2").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "C").endObject()).get(); + client().prepareIndex("idx_mshard_2").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "C").endObject()).get(); + refresh("idx_mshard_2"); + + segmentResponse = client().admin().indices().prepareSegments("idx_mshard_2").get(); + assertEquals(1, segmentResponse.getIndices().get("idx_mshard_2").getShards().get(0).getShards()[0].getSegments().size()); + + SearchResponse response = client().prepareSearch("idx_mshard_2", "idx_mshard_1") + .setSize(0) + .addAggregation(terms("terms").field(STRING_FIELD_NAME).showTermDocCountError(true).size(2).shardSize(2)) + .get(); + + Terms terms = response.getAggregations().get("terms"); + assertEquals(2, terms.getBuckets().size()); + assertEquals(4, terms.getDocCountError()); + + Terms.Bucket bucket = terms.getBuckets().get(0); // Bucket "B" + assertEquals("B", bucket.getKey().toString()); + assertEquals(5, bucket.getDocCount()); + assertEquals(0, bucket.getDocCountError()); + + bucket = terms.getBuckets().get(1); // Bucket "A" + assertEquals("A", bucket.getKey().toString()); + assertEquals(4, bucket.getDocCount()); + assertEquals(2, bucket.getDocCountError()); + } + + public void testSimpleAggErrorSingleShard() throws Exception { + assertAcked( + prepareCreate("idx_shard_error").setMapping(STRING_FIELD_NAME, "type=keyword") + .setSettings( + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + ) + ); + client().prepareIndex("idx_shard_error").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "A").endObject()).get(); + client().prepareIndex("idx_shard_error").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "A").endObject()).get(); + client().prepareIndex("idx_shard_error").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "A").endObject()).get(); + client().prepareIndex("idx_shard_error").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "A").endObject()).get(); + client().prepareIndex("idx_shard_error").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "A").endObject()).get(); + client().prepareIndex("idx_shard_error").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "A").endObject()).get(); + client().prepareIndex("idx_shard_error").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "B").endObject()).get(); + client().prepareIndex("idx_shard_error").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "B").endObject()).get(); + refresh("idx_shard_error"); + + SearchResponse response = client().prepareSearch("idx_shard_error") + .setSize(0) + .addAggregation(terms("terms").field(STRING_FIELD_NAME).showTermDocCountError(true).size(1).shardSize(2)) + .get(); + + Terms terms = response.getAggregations().get("terms"); + assertEquals(1, terms.getBuckets().size()); + assertEquals(0, terms.getDocCountError()); + + Terms.Bucket bucket = terms.getBuckets().get(0); + assertEquals("A", bucket.getKey().toString()); + assertEquals(6, bucket.getDocCount()); + assertEquals(0, bucket.getDocCountError()); + } + + public void testSliceLevelDocCountErrorSingleShard() throws Exception { + assumeTrue( + "Slice level error is not relevant to non-concurrent search cases", + internalCluster().clusterService().getClusterSettings().get(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING) + ); + + // Slices are created by sorting segments by doc count in descending order then distributing in round robin fashion. + // Creates 2 segments (and therefore 2 slices since slice_count = 2) as follows: + // 1. [A, A, A, B, B, C] + // 2. [A, B, B, B, C, C] + // Thus we expect the doc count error for A to be 2 as the nth largest bucket on slice 2 has size 2 + + assertAcked( + prepareCreate("idx_slice_error").setMapping(STRING_FIELD_NAME, "type=keyword") + .setSettings( + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + ) + ); + client().prepareIndex("idx_slice_error").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "A").endObject()).get(); + client().prepareIndex("idx_slice_error").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "A").endObject()).get(); + client().prepareIndex("idx_slice_error").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "A").endObject()).get(); + client().prepareIndex("idx_slice_error").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "B").endObject()).get(); + client().prepareIndex("idx_slice_error").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "B").endObject()).get(); + client().prepareIndex("idx_slice_error").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "C").endObject()).get(); + refresh("idx_slice_error"); + + client().prepareIndex("idx_slice_error").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "A").endObject()).get(); + client().prepareIndex("idx_slice_error").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "B").endObject()).get(); + client().prepareIndex("idx_slice_error").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "B").endObject()).get(); + client().prepareIndex("idx_slice_error").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "B").endObject()).get(); + client().prepareIndex("idx_slice_error").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "C").endObject()).get(); + client().prepareIndex("idx_slice_error").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "C").endObject()).get(); + refresh("idx_slice_error"); + + IndicesSegmentResponse segmentResponse = client().admin().indices().prepareSegments("idx_slice_error").get(); + assertEquals(2, segmentResponse.getIndices().get("idx_slice_error").getShards().get(0).getShards()[0].getSegments().size()); + + // Confirm that there is no error when shard_size == slice_size > cardinality + SearchResponse response = client().prepareSearch("idx_slice_error") + .setSize(0) + .addAggregation(terms("terms").field(STRING_FIELD_NAME).showTermDocCountError(true).size(1).shardSize(4)) + .get(); + + Terms terms = response.getAggregations().get("terms"); + assertEquals(1, terms.getBuckets().size()); + assertEquals(0, terms.getDocCountError()); + + Terms.Bucket bucket = terms.getBuckets().get(0); // Bucket "B" + assertEquals("B", bucket.getKey().toString()); + assertEquals(5, bucket.getDocCount()); + assertEquals(0, bucket.getDocCountError()); + + response = client().prepareSearch("idx_slice_error") + .setSize(0) + .addAggregation(terms("terms").field(STRING_FIELD_NAME).showTermDocCountError(true).size(2).shardSize(2)) + .get(); + + terms = response.getAggregations().get("terms"); + assertEquals(2, terms.getBuckets().size()); + assertEquals(4, terms.getDocCountError()); + + bucket = terms.getBuckets().get(0); // Bucket "B" + assertEquals("B", bucket.getKey().toString()); + assertEquals(5, bucket.getDocCount()); + assertEquals(0, bucket.getDocCountError()); + + bucket = terms.getBuckets().get(1); // Bucket "A" + assertEquals("A", bucket.getKey().toString()); + assertEquals(3, bucket.getDocCount()); + assertEquals(2, bucket.getDocCountError()); + } + + public void testSliceLevelDocCountErrorMultiShard() throws Exception { + assumeTrue( + "Slice level error is not relevant to non-concurrent search cases", + internalCluster().clusterService().getClusterSettings().get(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING) + ); + + // Size = 2, shard_size = 2 + // Shard_1 [A, A, A, A, B, B, C, C] + // slice_1 [A, A, A, B, B, C] {"A" : 3, "B" : 2} + // slice_2 [A, C] {"A" : 1, "C" : 1} + // Shard_1 buckets: {"A" : 4 - error: 0, "B" : 2 - error: 1} + // Shard_2 [A, A, B, B, B, C, C, C] + // slice_1 [A, B, B, B, C, C] {"B" : 3, "C" : 2} + // slice_2 [A, C] {"A" : 1, "C" : 1} + // Shard_2 buckets: {"B" : 3 - error: 1, "C" : 3 - error: 0} + // Overall + // {"B" : 5 - error: 2, "A" : 4 - error: 3} Agg error: 6 + + assertAcked( + prepareCreate("idx_mshard_1").setMapping(STRING_FIELD_NAME, "type=keyword") + .setSettings( + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + ) + ); + client().prepareIndex("idx_mshard_1").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "A").endObject()).get(); + client().prepareIndex("idx_mshard_1").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "A").endObject()).get(); + client().prepareIndex("idx_mshard_1").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "A").endObject()).get(); + client().prepareIndex("idx_mshard_1").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "B").endObject()).get(); + client().prepareIndex("idx_mshard_1").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "B").endObject()).get(); + client().prepareIndex("idx_mshard_1").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "C").endObject()).get(); + refresh("idx_mshard_1"); + + client().prepareIndex("idx_mshard_1").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "A").endObject()).get(); + client().prepareIndex("idx_mshard_1").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "C").endObject()).get(); + refresh("idx_mshard_1"); + + IndicesSegmentResponse segmentResponse = client().admin().indices().prepareSegments("idx_mshard_1").get(); + assertEquals(2, segmentResponse.getIndices().get("idx_mshard_1").getShards().get(0).getShards()[0].getSegments().size()); + + SearchResponse response = client().prepareSearch("idx_mshard_1") + .setSize(0) + .addAggregation(terms("terms").field(STRING_FIELD_NAME).showTermDocCountError(true).size(2).shardSize(2)) + .get(); + + Terms terms = response.getAggregations().get("terms"); + assertEquals(2, terms.getBuckets().size()); + assertEquals(3, terms.getDocCountError()); + + Terms.Bucket bucket = terms.getBuckets().get(0); + assertEquals("A", bucket.getKey().toString()); + assertEquals(4, bucket.getDocCount()); + assertEquals(0, bucket.getDocCountError()); + + bucket = terms.getBuckets().get(1); + assertEquals("B", bucket.getKey().toString()); + assertEquals(2, bucket.getDocCount()); + assertEquals(1, bucket.getDocCountError()); + + assertAcked( + prepareCreate("idx_mshard_2").setMapping(STRING_FIELD_NAME, "type=keyword") + .setSettings( + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + ) + ); + client().prepareIndex("idx_mshard_2").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "A").endObject()).get(); + client().prepareIndex("idx_mshard_2").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "B").endObject()).get(); + client().prepareIndex("idx_mshard_2").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "B").endObject()).get(); + client().prepareIndex("idx_mshard_2").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "B").endObject()).get(); + client().prepareIndex("idx_mshard_2").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "C").endObject()).get(); + client().prepareIndex("idx_mshard_2").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "C").endObject()).get(); + refresh("idx_mshard_2"); + + client().prepareIndex("idx_mshard_2").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "A").endObject()).get(); + client().prepareIndex("idx_mshard_2").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "C").endObject()).get(); + refresh("idx_mshard_2"); + + segmentResponse = client().admin().indices().prepareSegments("idx_mshard_2").get(); + assertEquals(2, segmentResponse.getIndices().get("idx_mshard_2").getShards().get(0).getShards()[0].getSegments().size()); + + response = client().prepareSearch("idx_mshard_2") + .setSize(0) + .addAggregation(terms("terms").field(STRING_FIELD_NAME).showTermDocCountError(true).size(2).shardSize(2)) + .get(); + + terms = response.getAggregations().get("terms"); + assertEquals(2, terms.getBuckets().size()); + assertEquals(3, terms.getDocCountError()); + + bucket = terms.getBuckets().get(0); + assertEquals("B", bucket.getKey().toString()); + assertEquals(3, bucket.getDocCount()); + assertEquals(1, bucket.getDocCountError()); + + bucket = terms.getBuckets().get(1); + assertEquals("C", bucket.getKey().toString()); + assertEquals(3, bucket.getDocCount()); + assertEquals(0, bucket.getDocCountError()); + + response = client().prepareSearch("idx_mshard_2", "idx_mshard_1") + .setSize(0) + .addAggregation(terms("terms").field(STRING_FIELD_NAME).showTermDocCountError(true).size(2).shardSize(2)) + .get(); + + terms = response.getAggregations().get("terms"); + assertEquals(2, terms.getBuckets().size()); + assertEquals(6, terms.getDocCountError()); + + bucket = terms.getBuckets().get(0); + assertEquals("B", bucket.getKey().toString()); + assertEquals(5, bucket.getDocCount()); + assertEquals(2, bucket.getDocCountError()); + + bucket = terms.getBuckets().get(1); + assertEquals("A", bucket.getKey().toString()); + assertEquals(4, bucket.getDocCount()); + assertEquals(3, bucket.getDocCountError()); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/TermsShardMinDocCountIT.java b/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/TermsShardMinDocCountIT.java index b0d8e7ea02e8f..3851b16551795 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/TermsShardMinDocCountIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/TermsShardMinDocCountIT.java @@ -88,6 +88,10 @@ private static String randomExecutionHint() { // see https://github.com/elastic/elasticsearch/issues/5998 public void testShardMinDocCountSignificantTermsTest() throws Exception { + assumeFalse( + "For concurrent segment search shard_min_doc_count is not enforced at the slice level. See https://github.com/opensearch-project/OpenSearch/issues/11847", + internalCluster().clusterService().getClusterSettings().get(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING) + ); String textMappings; if (randomBoolean()) { textMappings = "type=long"; @@ -157,6 +161,10 @@ private void addTermsDocs(String term, int numInClass, int numNotInClass, List getKey()).orElse(null)); + } + + @Override + public CacheKey getKey() { + return this.cacheHelper.getKey(); + } + + public DelegatingCacheKey getDelegatingCacheKey() { + return this.serializableCacheKey; + } + + @Override + public void addClosedListener(ClosedListener listener) { + this.cacheHelper.addClosedListener(listener); + } + } + + /** + * Wraps internal IndexReader.CacheKey and attaches a uniqueId to it which can be eventually be used instead of + * object itself for serialization purposes. + */ + public class DelegatingCacheKey { + private final CacheKey cacheKey; + private final String uniqueId; + + DelegatingCacheKey(CacheKey cacheKey) { + this.cacheKey = cacheKey; + this.uniqueId = UUID.randomUUID().toString(); + } + + public CacheKey getCacheKey() { + return this.cacheKey; + } + + public String getId() { + return uniqueId; + } } @Override diff --git a/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java b/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java index d3285c379bcc4..e19f8e8370d5b 100644 --- a/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java @@ -11,9 +11,6 @@ import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.util.FeatureFlags; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; import java.util.Set; /** @@ -32,17 +29,12 @@ protected FeatureFlagSettings( super(settings, settingsSet, settingUpgraders, scope); } - public static final Set> BUILT_IN_FEATURE_FLAGS = Collections.unmodifiableSet( - new HashSet<>( - Arrays.asList( - FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL_SETTING, - FeatureFlags.EXTENSIONS_SETTING, - FeatureFlags.IDENTITY_SETTING, - FeatureFlags.CONCURRENT_SEGMENT_SEARCH_SETTING, - FeatureFlags.TELEMETRY_SETTING, - FeatureFlags.DATETIME_FORMATTER_CACHING_SETTING, - FeatureFlags.WRITEABLE_REMOTE_INDEX_SETTING - ) - ) + public static final Set> BUILT_IN_FEATURE_FLAGS = Set.of( + FeatureFlags.EXTENSIONS_SETTING, + FeatureFlags.IDENTITY_SETTING, + FeatureFlags.CONCURRENT_SEGMENT_SEARCH_SETTING, + FeatureFlags.TELEMETRY_SETTING, + FeatureFlags.DATETIME_FORMATTER_CACHING_SETTING, + FeatureFlags.WRITEABLE_REMOTE_INDEX_SETTING ); } diff --git a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java index c54772caa574b..d4ab161527cc0 100644 --- a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java +++ b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java @@ -20,12 +20,6 @@ * @opensearch.internal */ public class FeatureFlags { - /** - * Gates the visibility of the segment replication experimental features that allows users to test unreleased beta features. - */ - public static final String SEGMENT_REPLICATION_EXPERIMENTAL = - "opensearch.experimental.feature.segment_replication_experimental.enabled"; - /** * Gates the ability for Searchable Snapshots to read snapshots that are older than the * guaranteed backward compatibility for OpenSearch (one prior major version) on a best effort basis. @@ -105,12 +99,6 @@ public static boolean isEnabled(Setting featureFlag) { } } - public static final Setting SEGMENT_REPLICATION_EXPERIMENTAL_SETTING = Setting.boolSetting( - SEGMENT_REPLICATION_EXPERIMENTAL, - false, - Property.NodeScope - ); - public static final Setting EXTENSIONS_SETTING = Setting.boolSetting(EXTENSIONS, false, Property.NodeScope); public static final Setting IDENTITY_SETTING = Setting.boolSetting(IDENTITY, false, Property.NodeScope); diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 4a19f8eb8714d..6d5c23274dbd6 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -51,7 +51,12 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.shard.IndexShard; import java.io.Closeable; import java.io.IOException; @@ -60,8 +65,10 @@ import java.util.HashSet; import java.util.Iterator; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentMap; +import java.util.function.Function; /** * The indices request cache allows to cache a shard level request stage responses, helping with improving @@ -103,13 +110,16 @@ public final class IndicesRequestCache implements RemovalListener registeredClosedListeners = ConcurrentCollections.newConcurrentMap(); private final Set keysToClean = ConcurrentCollections.newConcurrentSet(); private final ByteSizeValue size; private final TimeValue expire; private final Cache cache; + private final Function> cacheEntityLookup; - IndicesRequestCache(Settings settings) { + IndicesRequestCache(Settings settings, Function> cacheEntityFunction) { this.size = INDICES_CACHE_QUERY_SIZE.get(settings); this.expire = INDICES_CACHE_QUERY_EXPIRE.exists(settings) ? INDICES_CACHE_QUERY_EXPIRE.get(settings) : null; long sizeInBytes = size.getBytes(); @@ -121,6 +131,7 @@ public final class IndicesRequestCache implements RemovalListener notification) { - notification.getKey().entity.onRemoval(notification); + // In case this event happens for an old shard, we can safely ignore this as we don't keep track for old + // shards as part of request cache. + cacheEntityLookup.apply(notification.getKey().shardId).ifPresent(entity -> entity.onRemoval(notification)); } BytesReference getOrCompute( - CacheEntity cacheEntity, + IndicesService.IndexShardCacheEntity cacheEntity, CheckedSupplier loader, DirectoryReader reader, BytesReference cacheKey ) throws Exception { assert reader.getReaderCacheHelper() != null; - final Key key = new Key(cacheEntity, reader.getReaderCacheHelper().getKey(), cacheKey); + assert reader.getReaderCacheHelper() instanceof OpenSearchDirectoryReader.DelegatingCacheHelper; + + OpenSearchDirectoryReader.DelegatingCacheHelper delegatingCacheHelper = (OpenSearchDirectoryReader.DelegatingCacheHelper) reader + .getReaderCacheHelper(); + String readerCacheKeyId = delegatingCacheHelper.getDelegatingCacheKey().getId(); + assert readerCacheKeyId != null; + final Key key = new Key(((IndexShard) cacheEntity.getCacheIdentity()).shardId(), cacheKey, readerCacheKeyId); Loader cacheLoader = new Loader(cacheEntity, loader); BytesReference value = cache.computeIfAbsent(key, cacheLoader); if (cacheLoader.isLoaded()) { - key.entity.onMiss(); + cacheEntity.onMiss(); // see if its the first time we see this reader, and make sure to register a cleanup key - CleanupKey cleanupKey = new CleanupKey(cacheEntity, reader.getReaderCacheHelper().getKey()); + CleanupKey cleanupKey = new CleanupKey(cacheEntity, readerCacheKeyId); if (!registeredClosedListeners.containsKey(cleanupKey)) { Boolean previous = registeredClosedListeners.putIfAbsent(cleanupKey, Boolean.TRUE); if (previous == null) { @@ -159,7 +178,7 @@ BytesReference getOrCompute( } } } else { - key.entity.onHit(); + cacheEntity.onHit(); } return value; } @@ -170,9 +189,14 @@ BytesReference getOrCompute( * @param reader the reader to invalidate the cache entry for * @param cacheKey the cache key to invalidate */ - void invalidate(CacheEntity cacheEntity, DirectoryReader reader, BytesReference cacheKey) { + void invalidate(IndicesService.IndexShardCacheEntity cacheEntity, DirectoryReader reader, BytesReference cacheKey) { assert reader.getReaderCacheHelper() != null; - cache.invalidate(new Key(cacheEntity, reader.getReaderCacheHelper().getKey(), cacheKey)); + String readerCacheKeyId = null; + if (reader instanceof OpenSearchDirectoryReader) { + IndexReader.CacheHelper cacheHelper = ((OpenSearchDirectoryReader) reader).getDelegatingCacheHelper(); + readerCacheKeyId = ((OpenSearchDirectoryReader.DelegatingCacheHelper) cacheHelper).getDelegatingCacheKey().getId(); + } + cache.invalidate(new Key(((IndexShard) cacheEntity.getCacheIdentity()).shardId(), cacheKey, readerCacheKeyId)); } /** @@ -240,6 +264,7 @@ interface CacheEntity extends Accountable { * Called when this entity instance is removed */ void onRemoval(RemovalNotification notification); + } /** @@ -247,22 +272,26 @@ interface CacheEntity extends Accountable { * * @opensearch.internal */ - public static class Key implements Accountable { - private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(Key.class); - - public final CacheEntity entity; // use as identity equality - public final IndexReader.CacheKey readerCacheKey; + static class Key implements Accountable, Writeable { + public final ShardId shardId; // use as identity equality + public final String readerCacheKeyId; public final BytesReference value; - Key(CacheEntity entity, IndexReader.CacheKey readerCacheKey, BytesReference value) { - this.entity = entity; - this.readerCacheKey = Objects.requireNonNull(readerCacheKey); + Key(ShardId shardId, BytesReference value, String readerCacheKeyId) { + this.shardId = shardId; this.value = value; + this.readerCacheKeyId = Objects.requireNonNull(readerCacheKeyId); + } + + Key(StreamInput in) throws IOException { + this.shardId = in.readOptionalWriteable(ShardId::new); + this.readerCacheKeyId = in.readOptionalString(); + this.value = in.readBytesReference(); } @Override public long ramBytesUsed() { - return BASE_RAM_BYTES_USED + entity.ramBytesUsed() + value.length(); + return BASE_RAM_BYTES_USED + shardId.getBaseRamBytesUsed() + value.length(); } @Override @@ -276,28 +305,35 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Key key = (Key) o; - if (Objects.equals(readerCacheKey, key.readerCacheKey) == false) return false; - if (!entity.getCacheIdentity().equals(key.entity.getCacheIdentity())) return false; + if (!Objects.equals(readerCacheKeyId, key.readerCacheKeyId)) return false; + if (!shardId.equals(key.shardId)) return false; if (!value.equals(key.value)) return false; return true; } @Override public int hashCode() { - int result = entity.getCacheIdentity().hashCode(); - result = 31 * result + readerCacheKey.hashCode(); + int result = shardId.hashCode(); + result = 31 * result + readerCacheKeyId.hashCode(); result = 31 * result + value.hashCode(); return result; } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeOptionalWriteable(shardId); + out.writeOptionalString(readerCacheKeyId); + out.writeBytesReference(value); + } } private class CleanupKey implements IndexReader.ClosedListener { final CacheEntity entity; - final IndexReader.CacheKey readerCacheKey; + final String readerCacheKeyId; - private CleanupKey(CacheEntity entity, IndexReader.CacheKey readerCacheKey) { + private CleanupKey(CacheEntity entity, String readerCacheKeyId) { this.entity = entity; - this.readerCacheKey = readerCacheKey; + this.readerCacheKeyId = readerCacheKeyId; } @Override @@ -315,7 +351,7 @@ public boolean equals(Object o) { return false; } CleanupKey that = (CleanupKey) o; - if (Objects.equals(readerCacheKey, that.readerCacheKey) == false) return false; + if (!Objects.equals(readerCacheKeyId, that.readerCacheKeyId)) return false; if (!entity.getCacheIdentity().equals(that.entity.getCacheIdentity())) return false; return true; } @@ -323,7 +359,7 @@ public boolean equals(Object o) { @Override public int hashCode() { int result = entity.getCacheIdentity().hashCode(); - result = 31 * result + Objects.hashCode(readerCacheKey); + result = 31 * result + Objects.hashCode(readerCacheKeyId); return result; } } @@ -339,9 +375,9 @@ synchronized void cleanCache() { for (Iterator iterator = keysToClean.iterator(); iterator.hasNext();) { CleanupKey cleanupKey = iterator.next(); iterator.remove(); - if (cleanupKey.readerCacheKey == null || cleanupKey.entity.isOpen() == false) { + if (cleanupKey.readerCacheKeyId == null || !cleanupKey.entity.isOpen()) { // null indicates full cleanup, as does a closed shard - currentFullClean.add(cleanupKey.entity.getCacheIdentity()); + currentFullClean.add(((IndexShard) cleanupKey.entity.getCacheIdentity()).shardId()); } else { currentKeysToClean.add(cleanupKey); } @@ -349,10 +385,13 @@ synchronized void cleanCache() { if (!currentKeysToClean.isEmpty() || !currentFullClean.isEmpty()) { for (Iterator iterator = cache.keys().iterator(); iterator.hasNext();) { Key key = iterator.next(); - if (currentFullClean.contains(key.entity.getCacheIdentity())) { + if (currentFullClean.contains(key.shardId)) { iterator.remove(); } else { - if (currentKeysToClean.contains(new CleanupKey(key.entity, key.readerCacheKey))) { + // If the flow comes here, then we should have a open shard available on node. + if (currentKeysToClean.contains( + new CleanupKey(cacheEntityLookup.apply(key.shardId).orElse(null), key.readerCacheKeyId) + )) { iterator.remove(); } } diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 96dc8b6763994..bb8c9534e292a 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -410,7 +410,13 @@ public IndicesService( this.shardsClosedTimeout = settings.getAsTime(INDICES_SHARDS_CLOSED_TIMEOUT, new TimeValue(1, TimeUnit.DAYS)); this.analysisRegistry = analysisRegistry; this.indexNameExpressionResolver = indexNameExpressionResolver; - this.indicesRequestCache = new IndicesRequestCache(settings); + this.indicesRequestCache = new IndicesRequestCache(settings, (shardId -> { + IndexService indexService = this.indices.get(shardId.getIndex().getUUID()); + if (indexService == null) { + return Optional.empty(); + } + return Optional.of(new IndexShardCacheEntity(indexService.getShard(shardId.id()))); + })); this.indicesQueryCache = new IndicesQueryCache(settings); this.mapperRegistry = mapperRegistry; this.namedWriteableRegistry = namedWriteableRegistry; @@ -1751,7 +1757,6 @@ private BytesReference cacheShardLevelResult( BytesReference cacheKey, CheckedConsumer loader ) throws Exception { - IndexShardCacheEntity cacheEntity = new IndexShardCacheEntity(shard); CheckedSupplier supplier = () -> { /* BytesStreamOutput allows to pass the expected size but by default uses * BigArrays.PAGE_SIZE_IN_BYTES which is 16k. A common cached result ie. @@ -1768,7 +1773,7 @@ private BytesReference cacheShardLevelResult( return out.bytes(); } }; - return indicesRequestCache.getOrCompute(cacheEntity, supplier, reader, cacheKey); + return indicesRequestCache.getOrCompute(new IndexShardCacheEntity(shard), supplier, reader, cacheKey); } /** @@ -1776,11 +1781,12 @@ private BytesReference cacheShardLevelResult( * * @opensearch.internal */ - static final class IndexShardCacheEntity extends AbstractIndexShardCacheEntity { + public static class IndexShardCacheEntity extends AbstractIndexShardCacheEntity { + private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(IndexShardCacheEntity.class); private final IndexShard indexShard; - protected IndexShardCacheEntity(IndexShard indexShard) { + public IndexShardCacheEntity(IndexShard indexShard) { this.indexShard = indexShard; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalTerms.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalTerms.java index 0e773291881cf..b8f9406ff55b9 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalTerms.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalTerms.java @@ -225,6 +225,7 @@ public int hashCode() { protected final int requiredSize; protected final long minDocCount; protected final TermsAggregator.BucketCountThresholds bucketCountThresholds; + private boolean hasSliceLevelDocCountError = false; /** * Creates a new {@link InternalTerms} @@ -299,9 +300,7 @@ private BucketOrder getReduceOrder(List aggregations) { private long getDocCountError(InternalTerms terms, ReduceContext reduceContext) { int size = terms.getBuckets().size(); - // doc_count_error is always computed at the coordinator based on the buckets returned by the shards. This should be 0 during the - // shard level reduce as no buckets are being pruned at this stage. - if (reduceContext.isSliceLevel() || size == 0 || size < terms.getShardSize() || isKeyOrder(terms.order)) { + if (size == 0 || size < terms.getShardSize() || isKeyOrder(terms.order)) { return 0; } else if (InternalOrder.isCountDesc(terms.order)) { if (terms.getDocCountError() > 0) { @@ -398,6 +397,12 @@ public InternalAggregation reduce(List aggregations, Reduce for (InternalAggregation aggregation : aggregations) { @SuppressWarnings("unchecked") InternalTerms terms = (InternalTerms) aggregation; + // For Concurrent Segment Search the aggregation will have a computed doc count error coming from the shards. + // We use the existence of this doc count error to determine whether or not doc count error originated from the slice level + // and if so we will maintain the doc count error for the 1 shard case at the coordinator level + if (aggregations.size() == 1 && terms.getDocCountError() > 0) { + hasSliceLevelDocCountError = true; + } if (referenceTerms == null && aggregation.getClass().equals(UnmappedTerms.class) == false) { referenceTerms = terms; } @@ -500,7 +505,11 @@ For backward compatibility, we disable the merge sort and use ({@link InternalTe if (sumDocCountError == -1) { docCountError = -1; } else { - docCountError = aggregations.size() == 1 ? 0 : sumDocCountError; + if (hasSliceLevelDocCountError) { + docCountError = sumDocCountError; + } else { + docCountError = aggregations.size() == 1 ? 0 : sumDocCountError; + } } // Shards must return buckets sorted by key, so we apply the sort here in shard level reduce @@ -512,7 +521,7 @@ For backward compatibility, we disable the merge sort and use ({@link InternalTe @Override protected B reduceBucket(List buckets, ReduceContext context) { - assert buckets.size() > 0; + assert !buckets.isEmpty(); long docCount = 0; // For the per term doc count error we add up the errors from the // shards that did not respond with the term. To do this we add up @@ -523,7 +532,7 @@ protected B reduceBucket(List buckets, ReduceContext context) { for (B bucket : buckets) { docCount += bucket.getDocCount(); if (docCountError != -1) { - if (bucket.showDocCountError() == false || bucket.getDocCountError() == -1) { + if (bucket.showDocCountError() == false) { docCountError = -1; } else { docCountError += bucket.getDocCountError(); diff --git a/server/src/main/java/org/opensearch/search/internal/SearchContext.java b/server/src/main/java/org/opensearch/search/internal/SearchContext.java index cc43f4e5d79fb..02837da64dafd 100644 --- a/server/src/main/java/org/opensearch/search/internal/SearchContext.java +++ b/server/src/main/java/org/opensearch/search/internal/SearchContext.java @@ -86,8 +86,6 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; -import static org.opensearch.search.aggregations.bucket.BucketUtils.suggestShardSideQueueSize; - /** * This class encapsulates the state needed to execute a search. It holds a reference to the * shards point in time snapshot (IndexReader / ContextIndexSearcher) and allows passing on @@ -410,11 +408,10 @@ public boolean shouldUseConcurrentSearch() { * Returns local bucket count thresholds based on concurrent segment search status */ public LocalBucketCountThresholds asLocalBucketCountThresholds(TermsAggregator.BucketCountThresholds bucketCountThresholds) { - if (shouldUseConcurrentSearch()) { - return new LocalBucketCountThresholds(0, suggestShardSideQueueSize(bucketCountThresholds.getShardSize())); - } else { - return new LocalBucketCountThresholds(bucketCountThresholds.getShardMinDocCount(), bucketCountThresholds.getShardSize()); - } + return new LocalBucketCountThresholds( + shouldUseConcurrentSearch() ? 0 : bucketCountThresholds.getShardMinDocCount(), + bucketCountThresholds.getShardSize() + ); } /** diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index 8494259c8fd8a..73728aec12e51 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -52,23 +52,36 @@ import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.common.bytes.AbstractBytesReference; import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.core.xcontent.XContentHelper; +import org.opensearch.index.IndexNotFoundException; +import org.opensearch.index.IndexService; +import org.opensearch.index.cache.request.RequestCacheStats; import org.opensearch.index.cache.request.ShardRequestCache; import org.opensearch.index.query.TermQueryBuilder; -import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardState; +import org.opensearch.test.OpenSearchSingleNodeTestCase; import java.io.IOException; import java.util.Arrays; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.Optional; +import java.util.UUID; -public class IndicesRequestCacheTests extends OpenSearchTestCase { +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class IndicesRequestCacheTests extends OpenSearchSingleNodeTestCase { public void testBasicOperationsCache() throws Exception { - ShardRequestCache requestCacheStats = new ShardRequestCache(); - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY); + IndexShard indexShard = createIndex("test").getShard(0); + IndicesRequestCache cache = new IndicesRequestCache( + Settings.EMPTY, + (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))) + ); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -76,13 +89,13 @@ public void testBasicOperationsCache() throws Exception { DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - AtomicBoolean indexShard = new AtomicBoolean(true); // initial cache - TestEntity entity = new TestEntity(requestCacheStats, indexShard); + IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); Loader loader = new Loader(reader, 0); BytesReference value = cache.getOrCompute(entity, loader, reader, termBytes); assertEquals("foo", value.streamInput().readString()); + ShardRequestCache requestCacheStats = indexShard.requestCache(); assertEquals(0, requestCacheStats.stats().getHitCount()); assertEquals(1, requestCacheStats.stats().getMissCount()); assertEquals(0, requestCacheStats.stats().getEvictions()); @@ -90,10 +103,11 @@ public void testBasicOperationsCache() throws Exception { assertEquals(1, cache.count()); // cache hit - entity = new TestEntity(requestCacheStats, indexShard); + entity = new IndicesService.IndexShardCacheEntity(indexShard); loader = new Loader(reader, 0); value = cache.getOrCompute(entity, loader, reader, termBytes); assertEquals("foo", value.streamInput().readString()); + requestCacheStats = indexShard.requestCache(); assertEquals(1, requestCacheStats.stats().getHitCount()); assertEquals(1, requestCacheStats.stats().getMissCount()); assertEquals(0, requestCacheStats.stats().getEvictions()); @@ -106,7 +120,7 @@ public void testBasicOperationsCache() throws Exception { if (randomBoolean()) { reader.close(); } else { - indexShard.set(false); // closed shard but reader is still open + indexShard.close("test", true, true); // closed shard but reader is still open cache.clear(entity); } cache.cleanCache(); @@ -122,9 +136,17 @@ public void testBasicOperationsCache() throws Exception { } public void testCacheDifferentReaders() throws Exception { - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY); - AtomicBoolean indexShard = new AtomicBoolean(true); - ShardRequestCache requestCacheStats = new ShardRequestCache(); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexShard indexShard = createIndex("test").getShard(0); + IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, (shardId -> { + IndexService indexService = null; + try { + indexService = indicesService.indexServiceSafe(shardId.getIndex()); + } catch (IndexNotFoundException ex) { + return Optional.empty(); + } + return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); + })); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -141,9 +163,10 @@ public void testCacheDifferentReaders() throws Exception { DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); // initial cache - TestEntity entity = new TestEntity(requestCacheStats, indexShard); + IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); Loader loader = new Loader(reader, 0); BytesReference value = cache.getOrCompute(entity, loader, reader, termBytes); + ShardRequestCache requestCacheStats = entity.stats(); assertEquals("foo", value.streamInput().readString()); assertEquals(0, requestCacheStats.stats().getHitCount()); assertEquals(1, requestCacheStats.stats().getMissCount()); @@ -155,9 +178,10 @@ public void testCacheDifferentReaders() throws Exception { assertEquals(1, cache.numRegisteredCloseListeners()); // cache the second - TestEntity secondEntity = new TestEntity(requestCacheStats, indexShard); + IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); loader = new Loader(secondReader, 0); value = cache.getOrCompute(entity, loader, secondReader, termBytes); + requestCacheStats = entity.stats(); assertEquals("bar", value.streamInput().readString()); assertEquals(0, requestCacheStats.stats().getHitCount()); assertEquals(2, requestCacheStats.stats().getMissCount()); @@ -167,9 +191,10 @@ public void testCacheDifferentReaders() throws Exception { assertTrue(requestCacheStats.stats().getMemorySize().bytesAsInt() > cacheSize + value.length()); assertEquals(2, cache.numRegisteredCloseListeners()); - secondEntity = new TestEntity(requestCacheStats, indexShard); + secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); loader = new Loader(secondReader, 0); value = cache.getOrCompute(secondEntity, loader, secondReader, termBytes); + requestCacheStats = entity.stats(); assertEquals("bar", value.streamInput().readString()); assertEquals(1, requestCacheStats.stats().getHitCount()); assertEquals(2, requestCacheStats.stats().getMissCount()); @@ -177,10 +202,11 @@ public void testCacheDifferentReaders() throws Exception { assertTrue(loader.loadedFromCache); assertEquals(2, cache.count()); - entity = new TestEntity(requestCacheStats, indexShard); + entity = new IndicesService.IndexShardCacheEntity(indexShard); loader = new Loader(reader, 0); value = cache.getOrCompute(entity, loader, reader, termBytes); assertEquals("foo", value.streamInput().readString()); + requestCacheStats = entity.stats(); assertEquals(2, requestCacheStats.stats().getHitCount()); assertEquals(2, requestCacheStats.stats().getMissCount()); assertEquals(0, requestCacheStats.stats().getEvictions()); @@ -201,7 +227,7 @@ public void testCacheDifferentReaders() throws Exception { if (randomBoolean()) { secondReader.close(); } else { - indexShard.set(false); // closed shard but reader is still open + indexShard.close("test", true, true); // closed shard but reader is still open cache.clear(secondEntity); } cache.cleanCache(); @@ -218,9 +244,11 @@ public void testCacheDifferentReaders() throws Exception { public void testEviction() throws Exception { final ByteSizeValue size; { - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY); - AtomicBoolean indexShard = new AtomicBoolean(true); - ShardRequestCache requestCacheStats = new ShardRequestCache(); + IndexShard indexShard = createIndex("test").getShard(0); + IndicesRequestCache cache = new IndicesRequestCache( + Settings.EMPTY, + (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))) + ); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -228,26 +256,26 @@ public void testEviction() throws Exception { DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - TestEntity entity = new TestEntity(requestCacheStats, indexShard); + IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); Loader loader = new Loader(reader, 0); writer.updateDocument(new Term("id", "0"), newDoc(0, "bar")); DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TestEntity secondEntity = new TestEntity(requestCacheStats, indexShard); + IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); Loader secondLoader = new Loader(secondReader, 0); BytesReference value1 = cache.getOrCompute(entity, loader, reader, termBytes); assertEquals("foo", value1.streamInput().readString()); BytesReference value2 = cache.getOrCompute(secondEntity, secondLoader, secondReader, termBytes); assertEquals("bar", value2.streamInput().readString()); - size = requestCacheStats.stats().getMemorySize(); + size = indexShard.requestCache().stats().getMemorySize(); IOUtils.close(reader, secondReader, writer, dir, cache); } + IndexShard indexShard = createIndex("test1").getShard(0); IndicesRequestCache cache = new IndicesRequestCache( - Settings.builder().put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), size.getBytes() + 1 + "b").build() + Settings.builder().put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), size.getBytes() + 1 + "b").build(), + (shardId -> Optional.of(new IndicesService.IndexShardCacheEntity(indexShard))) ); - AtomicBoolean indexShard = new AtomicBoolean(true); - ShardRequestCache requestCacheStats = new ShardRequestCache(); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -255,36 +283,44 @@ public void testEviction() throws Exception { DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - TestEntity entity = new TestEntity(requestCacheStats, indexShard); + IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); Loader loader = new Loader(reader, 0); writer.updateDocument(new Term("id", "0"), newDoc(0, "bar")); DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TestEntity secondEntity = new TestEntity(requestCacheStats, indexShard); + IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); Loader secondLoader = new Loader(secondReader, 0); writer.updateDocument(new Term("id", "0"), newDoc(0, "baz")); DirectoryReader thirdReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TestEntity thirddEntity = new TestEntity(requestCacheStats, indexShard); + IndicesService.IndexShardCacheEntity thirddEntity = new IndicesService.IndexShardCacheEntity(indexShard); Loader thirdLoader = new Loader(thirdReader, 0); BytesReference value1 = cache.getOrCompute(entity, loader, reader, termBytes); assertEquals("foo", value1.streamInput().readString()); BytesReference value2 = cache.getOrCompute(secondEntity, secondLoader, secondReader, termBytes); assertEquals("bar", value2.streamInput().readString()); - logger.info("Memory size: {}", requestCacheStats.stats().getMemorySize()); + logger.info("Memory size: {}", indexShard.requestCache().stats().getMemorySize()); BytesReference value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, termBytes); assertEquals("baz", value3.streamInput().readString()); assertEquals(2, cache.count()); - assertEquals(1, requestCacheStats.stats().getEvictions()); + assertEquals(1, indexShard.requestCache().stats().getEvictions()); IOUtils.close(reader, secondReader, thirdReader, writer, dir, cache); } public void testClearAllEntityIdentity() throws Exception { - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY); - AtomicBoolean indexShard = new AtomicBoolean(true); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexShard indexShard = createIndex("test").getShard(0); + IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, (shardId -> { + IndexService indexService = null; + try { + indexService = indicesService.indexServiceSafe(shardId.getIndex()); + } catch (IndexNotFoundException ex) { + return Optional.empty(); + } + return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); + })); - ShardRequestCache requestCacheStats = new ShardRequestCache(); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -292,36 +328,39 @@ public void testClearAllEntityIdentity() throws Exception { DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - TestEntity entity = new TestEntity(requestCacheStats, indexShard); + IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); Loader loader = new Loader(reader, 0); writer.updateDocument(new Term("id", "0"), newDoc(0, "bar")); DirectoryReader secondReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - TestEntity secondEntity = new TestEntity(requestCacheStats, indexShard); + IndicesService.IndexShardCacheEntity secondEntity = new IndicesService.IndexShardCacheEntity(indexShard); Loader secondLoader = new Loader(secondReader, 0); writer.updateDocument(new Term("id", "0"), newDoc(0, "baz")); DirectoryReader thirdReader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); - AtomicBoolean differentIdentity = new AtomicBoolean(true); - TestEntity thirddEntity = new TestEntity(requestCacheStats, differentIdentity); + IndicesService.IndexShardCacheEntity thirddEntity = new IndicesService.IndexShardCacheEntity(createIndex("test1").getShard(0)); Loader thirdLoader = new Loader(thirdReader, 0); BytesReference value1 = cache.getOrCompute(entity, loader, reader, termBytes); assertEquals("foo", value1.streamInput().readString()); BytesReference value2 = cache.getOrCompute(secondEntity, secondLoader, secondReader, termBytes); assertEquals("bar", value2.streamInput().readString()); - logger.info("Memory size: {}", requestCacheStats.stats().getMemorySize()); + logger.info("Memory size: {}", indexShard.requestCache().stats().getMemorySize()); BytesReference value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, termBytes); assertEquals("baz", value3.streamInput().readString()); assertEquals(3, cache.count()); - final long hitCount = requestCacheStats.stats().getHitCount(); + RequestCacheStats requestCacheStats = entity.stats().stats(); + requestCacheStats.add(thirddEntity.stats().stats()); + final long hitCount = requestCacheStats.getHitCount(); // clear all for the indexShard Idendity even though is't still open cache.clear(randomFrom(entity, secondEntity)); cache.cleanCache(); assertEquals(1, cache.count()); // third has not been validated since it's a different identity value3 = cache.getOrCompute(thirddEntity, thirdLoader, thirdReader, termBytes); - assertEquals(hitCount + 1, requestCacheStats.stats().getHitCount()); + requestCacheStats = entity.stats().stats(); + requestCacheStats.add(thirddEntity.stats().stats()); + assertEquals(hitCount + 1, requestCacheStats.getHitCount()); assertEquals("baz", value3.streamInput().readString()); IOUtils.close(reader, secondReader, thirdReader, writer, dir, cache); @@ -365,8 +404,17 @@ public BytesReference get() { } public void testInvalidate() throws Exception { - ShardRequestCache requestCacheStats = new ShardRequestCache(); - IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexShard indexShard = createIndex("test").getShard(0); + IndicesRequestCache cache = new IndicesRequestCache(Settings.EMPTY, (shardId -> { + IndexService indexService = null; + try { + indexService = indicesService.indexServiceSafe(shardId.getIndex()); + } catch (IndexNotFoundException ex) { + return Optional.empty(); + } + return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); + })); Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); @@ -374,13 +422,13 @@ public void testInvalidate() throws Exception { DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), new ShardId("foo", "bar", 1)); TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); - AtomicBoolean indexShard = new AtomicBoolean(true); // initial cache - TestEntity entity = new TestEntity(requestCacheStats, indexShard); + IndicesService.IndexShardCacheEntity entity = new IndicesService.IndexShardCacheEntity(indexShard); Loader loader = new Loader(reader, 0); BytesReference value = cache.getOrCompute(entity, loader, reader, termBytes); assertEquals("foo", value.streamInput().readString()); + ShardRequestCache requestCacheStats = entity.stats(); assertEquals(0, requestCacheStats.stats().getHitCount()); assertEquals(1, requestCacheStats.stats().getMissCount()); assertEquals(0, requestCacheStats.stats().getEvictions()); @@ -388,10 +436,11 @@ public void testInvalidate() throws Exception { assertEquals(1, cache.count()); // cache hit - entity = new TestEntity(requestCacheStats, indexShard); + entity = new IndicesService.IndexShardCacheEntity(indexShard); loader = new Loader(reader, 0); value = cache.getOrCompute(entity, loader, reader, termBytes); assertEquals("foo", value.streamInput().readString()); + requestCacheStats = entity.stats(); assertEquals(1, requestCacheStats.stats().getHitCount()); assertEquals(1, requestCacheStats.stats().getMissCount()); assertEquals(0, requestCacheStats.stats().getEvictions()); @@ -401,11 +450,12 @@ public void testInvalidate() throws Exception { assertEquals(1, cache.numRegisteredCloseListeners()); // load again after invalidate - entity = new TestEntity(requestCacheStats, indexShard); + entity = new IndicesService.IndexShardCacheEntity(indexShard); loader = new Loader(reader, 0); cache.invalidate(entity, reader, termBytes); value = cache.getOrCompute(entity, loader, reader, termBytes); assertEquals("foo", value.streamInput().readString()); + requestCacheStats = entity.stats(); assertEquals(1, requestCacheStats.stats().getHitCount()); assertEquals(2, requestCacheStats.stats().getMissCount()); assertEquals(0, requestCacheStats.stats().getEvictions()); @@ -418,7 +468,7 @@ public void testInvalidate() throws Exception { if (randomBoolean()) { reader.close(); } else { - indexShard.set(false); // closed shard but reader is still open + indexShard.close("test", true, true); // closed shard but reader is still open cache.clear(entity); } cache.cleanCache(); @@ -433,22 +483,25 @@ public void testInvalidate() throws Exception { } public void testEqualsKey() throws IOException { - AtomicBoolean trueBoolean = new AtomicBoolean(true); - AtomicBoolean falseBoolean = new AtomicBoolean(false); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); Directory dir = newDirectory(); IndexWriterConfig config = newIndexWriterConfig(); IndexWriter writer = new IndexWriter(dir, config); - IndexReader reader1 = DirectoryReader.open(writer); - IndexReader.CacheKey rKey1 = reader1.getReaderCacheHelper().getKey(); + ShardId shardId = new ShardId("foo", "bar", 1); + ShardId shardId1 = new ShardId("foo1", "bar1", 2); + IndexReader reader1 = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), shardId); + String rKey1 = ((OpenSearchDirectoryReader) reader1).getDelegatingCacheHelper().getDelegatingCacheKey().getId(); writer.addDocument(new Document()); - IndexReader reader2 = DirectoryReader.open(writer); - IndexReader.CacheKey rKey2 = reader2.getReaderCacheHelper().getKey(); + IndexReader reader2 = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), shardId); + String rKey2 = ((OpenSearchDirectoryReader) reader2).getDelegatingCacheHelper().getDelegatingCacheKey().getId(); IOUtils.close(reader1, reader2, writer, dir); - IndicesRequestCache.Key key1 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), rKey1, new TestBytesReference(1)); - IndicesRequestCache.Key key2 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), rKey1, new TestBytesReference(1)); - IndicesRequestCache.Key key3 = new IndicesRequestCache.Key(new TestEntity(null, falseBoolean), rKey1, new TestBytesReference(1)); - IndicesRequestCache.Key key4 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), rKey2, new TestBytesReference(1)); - IndicesRequestCache.Key key5 = new IndicesRequestCache.Key(new TestEntity(null, trueBoolean), rKey1, new TestBytesReference(2)); + IndexShard indexShard = mock(IndexShard.class); + when(indexShard.state()).thenReturn(IndexShardState.STARTED); + IndicesRequestCache.Key key1 = new IndicesRequestCache.Key(shardId, new TestBytesReference(1), rKey1); + IndicesRequestCache.Key key2 = new IndicesRequestCache.Key(shardId, new TestBytesReference(1), rKey1); + IndicesRequestCache.Key key3 = new IndicesRequestCache.Key(shardId1, new TestBytesReference(1), rKey1); + IndicesRequestCache.Key key4 = new IndicesRequestCache.Key(shardId, new TestBytesReference(1), rKey2); + IndicesRequestCache.Key key5 = new IndicesRequestCache.Key(shardId, new TestBytesReference(2), rKey2); String s = "Some other random object"; assertEquals(key1, key1); assertEquals(key1, key2); @@ -459,6 +512,29 @@ public void testEqualsKey() throws IOException { assertNotEquals(key1, key5); } + public void testSerializationDeserializationOfCacheKey() throws Exception { + TermQueryBuilder termQuery = new TermQueryBuilder("id", "0"); + BytesReference termBytes = XContentHelper.toXContent(termQuery, MediaTypeRegistry.JSON, false); + IndexService indexService = createIndex("test"); + IndexShard indexShard = indexService.getShard(0); + IndicesService.IndexShardCacheEntity shardCacheEntity = new IndicesService.IndexShardCacheEntity(indexShard); + String readerCacheKeyId = UUID.randomUUID().toString(); + IndicesRequestCache.Key key1 = new IndicesRequestCache.Key(indexShard.shardId(), termBytes, readerCacheKeyId); + BytesReference bytesReference = null; + try (BytesStreamOutput out = new BytesStreamOutput()) { + key1.writeTo(out); + bytesReference = out.bytes(); + } + StreamInput in = bytesReference.streamInput(); + + IndicesRequestCache.Key key2 = new IndicesRequestCache.Key(in); + + assertEquals(readerCacheKeyId, key2.readerCacheKeyId); + assertEquals(((IndexShard) shardCacheEntity.getCacheIdentity()).shardId(), key2.shardId); + assertEquals(termBytes, key2.value); + + } + private class TestBytesReference extends AbstractBytesReference { int dummyValue; @@ -509,34 +585,4 @@ public boolean isFragment() { return false; } } - - private class TestEntity extends AbstractIndexShardCacheEntity { - private final AtomicBoolean standInForIndexShard; - private final ShardRequestCache shardRequestCache; - - private TestEntity(ShardRequestCache shardRequestCache, AtomicBoolean standInForIndexShard) { - this.standInForIndexShard = standInForIndexShard; - this.shardRequestCache = shardRequestCache; - } - - @Override - protected ShardRequestCache stats() { - return shardRequestCache; - } - - @Override - public boolean isOpen() { - return standInForIndexShard.get(); - } - - @Override - public Object getCacheIdentity() { - return standInForIndexShard; - } - - @Override - public long ramBytesUsed() { - return 42; - } - } } diff --git a/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java b/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java index add1dfc348a8b..8a00cd2db21c9 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java @@ -44,18 +44,15 @@ import org.apache.lucene.search.Weight; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.routing.allocation.DiskThresholdSettings; -import org.opensearch.common.cache.RemovalNotification; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.core.common.bytes.BytesArray; -import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; import org.opensearch.index.IndexModule; import org.opensearch.index.IndexService; import org.opensearch.index.engine.Engine; import org.opensearch.index.shard.IndexShard; -import org.opensearch.indices.IndicesRequestCache.Key; import org.opensearch.indices.breaker.HierarchyCircuitBreakerService; import org.opensearch.node.MockNode; import org.opensearch.node.Node; @@ -373,15 +370,12 @@ public void testCloseWhileOngoingRequestUsesRequestCache() throws Exception { assertEquals(1, indicesService.indicesRefCount.refCount()); assertEquals(0L, cache.count()); - IndicesRequestCache.CacheEntity cacheEntity = new IndicesRequestCache.CacheEntity() { + IndicesService.IndexShardCacheEntity cacheEntity = new IndicesService.IndexShardCacheEntity(shard) { @Override public long ramBytesUsed() { return 42; } - @Override - public void onCached(Key key, BytesReference value) {} - @Override public boolean isOpen() { return true; @@ -389,17 +383,8 @@ public boolean isOpen() { @Override public Object getCacheIdentity() { - return this; + return shard; } - - @Override - public void onHit() {} - - @Override - public void onMiss() {} - - @Override - public void onRemoval(RemovalNotification notification) {} }; cache.getOrCompute(cacheEntity, () -> new BytesArray("bar"), searcher.getDirectoryReader(), new BytesArray("foo")); assertEquals(1L, cache.count());