From 662ab417ac36437fff4bd4d648a7f8dfebea9c88 Mon Sep 17 00:00:00 2001 From: Jay Deng Date: Wed, 10 Jan 2024 15:55:26 -0800 Subject: [PATCH] Correctly calculate doc_count_error at the slice level for concurrent segment search. Change slice_size heuristic to be equal to shard_size. Signed-off-by: Jay Deng --- CHANGELOG.md | 1 + .../aggregations/bucket/ShardSizeTermsIT.java | 24 +- .../bucket/TermsDocCountErrorIT.java | 5 + .../bucket/TermsFixedDocCountErrorIT.java | 347 ++++++++++++++++++ .../bucket/TermsShardMinDocCountIT.java | 16 +- .../bucket/terms/InternalTerms.java | 21 +- .../search/internal/SearchContext.java | 11 +- 7 files changed, 402 insertions(+), 23 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/TermsFixedDocCountErrorIT.java diff --git a/CHANGELOG.md b/CHANGELOG.md index fed2bb54783a7..cfb314655e5d8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -192,6 +192,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Restore support for Java 8 for RestClient ([#11562](https://github.com/opensearch-project/OpenSearch/pull/11562)) - Add deleted doc count in _cat/shards ([#11678](https://github.com/opensearch-project/OpenSearch/pull/11678)) - Capture information for additional query types and aggregation types ([#11582](https://github.com/opensearch-project/OpenSearch/pull/11582)) +- Use slice_size == shard_size heuristic in terms aggs for concurrent segment search and properly calculate the doc_count_error ([#11732](https://github.com/opensearch-project/OpenSearch/pull/11732)) ### Deprecated diff --git a/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/ShardSizeTermsIT.java b/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/ShardSizeTermsIT.java index 145830f02ee56..222c2a23d8c71 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/ShardSizeTermsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/ShardSizeTermsIT.java @@ -86,6 +86,7 @@ public void testShardSizeEqualsSizeString() throws Exception { terms("keys").field("key") .size(3) .shardSize(3) + .showTermDocCountError(true) .collectMode(randomFrom(SubAggCollectionMode.values())) .order(BucketOrder.count(false)) ) @@ -93,13 +94,16 @@ public void testShardSizeEqualsSizeString() throws Exception { Terms terms = response.getAggregations().get("keys"); List buckets = terms.getBuckets(); - assertThat(buckets.size(), equalTo(3)); + assertEquals(3, buckets.size()); Map expected = new HashMap<>(); expected.put("1", 8L); expected.put("3", 8L); expected.put("2", 4L); + Long expectedDocCount; for (Terms.Bucket bucket : buckets) { - assertThat(bucket.getDocCount(), equalTo(expected.get(bucket.getKeyAsString()))); + expectedDocCount = expected.get(bucket.getKeyAsString()); + // Doc count can vary when using concurrent segment search. See https://github.com/opensearch-project/OpenSearch/issues/11680 + assertTrue((bucket.getDocCount() == expectedDocCount) || bucket.getDocCount() + bucket.getDocCountError() >= expectedDocCount); } } @@ -221,6 +225,7 @@ public void testShardSizeEqualsSizeLong() throws Exception { terms("keys").field("key") .size(3) .shardSize(3) + .showTermDocCountError(true) .collectMode(randomFrom(SubAggCollectionMode.values())) .order(BucketOrder.count(false)) ) @@ -228,13 +233,16 @@ public void testShardSizeEqualsSizeLong() throws Exception { Terms terms = response.getAggregations().get("keys"); List buckets = terms.getBuckets(); - assertThat(buckets.size(), equalTo(3)); + assertEquals(3, buckets.size()); Map expected = new HashMap<>(); expected.put(1, 8L); expected.put(3, 8L); expected.put(2, 4L); + Long expectedDocCount; for (Terms.Bucket bucket : buckets) { - assertThat(bucket.getDocCount(), equalTo(expected.get(bucket.getKeyAsNumber().intValue()))); + expectedDocCount = expected.get(bucket.getKeyAsNumber().intValue()); + // Doc count can vary when using concurrent segment search. See https://github.com/opensearch-project/OpenSearch/issues/11680 + assertTrue((bucket.getDocCount() == expectedDocCount) || bucket.getDocCount() + bucket.getDocCountError() >= expectedDocCount); } } @@ -355,6 +363,7 @@ public void testShardSizeEqualsSizeDouble() throws Exception { terms("keys").field("key") .size(3) .shardSize(3) + .showTermDocCountError(true) .collectMode(randomFrom(SubAggCollectionMode.values())) .order(BucketOrder.count(false)) ) @@ -362,13 +371,16 @@ public void testShardSizeEqualsSizeDouble() throws Exception { Terms terms = response.getAggregations().get("keys"); List buckets = terms.getBuckets(); - assertThat(buckets.size(), equalTo(3)); + assertEquals(3, buckets.size()); Map expected = new HashMap<>(); expected.put(1, 8L); expected.put(3, 8L); expected.put(2, 4L); + Long expectedDocCount; for (Terms.Bucket bucket : buckets) { - assertThat(bucket.getDocCount(), equalTo(expected.get(bucket.getKeyAsNumber().intValue()))); + expectedDocCount = expected.get(bucket.getKeyAsNumber().intValue()); + // Doc count can vary when using concurrent segment search. See https://github.com/opensearch-project/OpenSearch/issues/11680 + assertTrue((bucket.getDocCount() == expectedDocCount) || bucket.getDocCount() + bucket.getDocCountError() >= expectedDocCount); } } diff --git a/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/TermsDocCountErrorIT.java b/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/TermsDocCountErrorIT.java index b355ce6d7a8dd..78fd86a8e4300 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/TermsDocCountErrorIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/TermsDocCountErrorIT.java @@ -1106,6 +1106,11 @@ public void testDoubleValueFieldSubAggDesc() throws Exception { * 3 one-shard indices. */ public void testFixedDocs() throws Exception { + // Force merge each shard down to 1 segment to verify results are the same between concurrent and non-concurrent search paths + // Else for concurrent segment search there will be additional error introduced during the slice level reduce and thus different + // buckets may be returned. See https://github.com/opensearch-project/OpenSearch/issues/11680", + client().admin().indices().prepareForceMerge("idx_fixed_docs_0", "idx_fixed_docs_1", "idx_fixed_docs_2").setMaxNumSegments(1).get(); + SearchResponse response = client().prepareSearch("idx_fixed_docs_0", "idx_fixed_docs_1", "idx_fixed_docs_2") .addAggregation( terms("terms").executionHint(randomExecutionHint()) diff --git a/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/TermsFixedDocCountErrorIT.java b/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/TermsFixedDocCountErrorIT.java new file mode 100644 index 0000000000000..5ad913e8c7086 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/TermsFixedDocCountErrorIT.java @@ -0,0 +1,347 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.aggregations.bucket; + +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; + +import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.search.aggregations.bucket.terms.Terms; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.ParameterizedOpenSearchIntegTestCase; + +import java.util.Arrays; +import java.util.Collection; + +import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING; +import static org.opensearch.search.aggregations.AggregationBuilders.terms; +import static org.opensearch.test.OpenSearchIntegTestCase.Scope.TEST; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +@OpenSearchIntegTestCase.ClusterScope(scope = TEST, numClientNodes = 0, maxNumDataNodes = 1, supportsDedicatedMasters = false) +public class TermsFixedDocCountErrorIT extends ParameterizedOpenSearchIntegTestCase { + + private static final String STRING_FIELD_NAME = "s_value"; + + public TermsFixedDocCountErrorIT(Settings dynamicSettings) { + super(dynamicSettings); + } + + @ParametersFactory + public static Collection parameters() { + return Arrays.asList( + new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() }, + new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() } + ); + } + + @Override + protected Settings featureFlagSettings() { + return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.CONCURRENT_SEGMENT_SEARCH, "true").build(); + } + + public void testSimpleAggErrorMultiShard() throws Exception { + // size = 1, shard_size = 2 + // Shard_1 [A, A, A, A, B, B, C, C, D, D] -> Buckets {"A" : 4, "B" : 2} + // Shard_2 [A, B, B, B, C, C, C, D, D, D] -> Buckets {"B" : 3, "C" : 3} + // coordinator -> Buckets {"B" : 5, "A" : 4} + // Agg error is 4, from (shard_size)th bucket on each shard + // Bucket "A" error is 2, from (shard_size)th bucket on shard_2 + // Bucket "B" error is 0, it's present on both shards + + // size = 1 shard_size = 1 slice_size = 1 + // non-cs / cs + // Shard_1 [A, B, C] + // Shard_2 [B, C, D] + // cs + // Shard_1 slice_1 [A, B, C] -> {a : 1} -> {a : 1 -- error: 1} + // slice_2 [B, C, D] -> {b : 1} + // Coordinator should return the same doc count error in both cases + + assertAcked( + prepareCreate("idx_mshard_1").setMapping(STRING_FIELD_NAME, "type=keyword") + .setSettings( + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + ) + ); + client().prepareIndex("idx_mshard_1").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "A").endObject()).get(); + client().prepareIndex("idx_mshard_1").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "A").endObject()).get(); + client().prepareIndex("idx_mshard_1").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "A").endObject()).get(); + client().prepareIndex("idx_mshard_1").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "A").endObject()).get(); + client().prepareIndex("idx_mshard_1").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "B").endObject()).get(); + client().prepareIndex("idx_mshard_1").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "B").endObject()).get(); + client().prepareIndex("idx_mshard_1").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "C").endObject()).get(); + refresh("idx_mshard_1"); + + IndicesSegmentResponse segmentResponse = client().admin().indices().prepareSegments("idx_mshard_1").get(); + assertEquals(1, segmentResponse.getIndices().get("idx_mshard_1").getShards().get(0).getShards()[0].getSegments().size()); + + assertAcked( + prepareCreate("idx_mshard_2").setMapping(STRING_FIELD_NAME, "type=keyword") + .setSettings( + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + ) + ); + client().prepareIndex("idx_mshard_2").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "A").endObject()).get(); + client().prepareIndex("idx_mshard_2").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "B").endObject()).get(); + client().prepareIndex("idx_mshard_2").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "B").endObject()).get(); + client().prepareIndex("idx_mshard_2").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "B").endObject()).get(); + client().prepareIndex("idx_mshard_2").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "C").endObject()).get(); + client().prepareIndex("idx_mshard_2").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "C").endObject()).get(); + refresh("idx_mshard_2"); + + segmentResponse = client().admin().indices().prepareSegments("idx_mshard_2").get(); + assertEquals(1, segmentResponse.getIndices().get("idx_mshard_2").getShards().get(0).getShards()[0].getSegments().size()); + + SearchResponse response = client().prepareSearch("idx_mshard_2", "idx_mshard_1") + .setSize(0) + .addAggregation(terms("terms").field(STRING_FIELD_NAME).showTermDocCountError(true).size(2).shardSize(2)) + .get(); + + Terms terms = response.getAggregations().get("terms"); + assertEquals(2, terms.getBuckets().size()); + assertEquals(4, terms.getDocCountError()); + + Terms.Bucket bucket = terms.getBuckets().get(0); // Bucket "B" + assertEquals("B", bucket.getKey().toString()); + assertEquals(5, bucket.getDocCount()); + assertEquals(0, bucket.getDocCountError()); + + bucket = terms.getBuckets().get(1); // Bucket "A" + assertEquals("A", bucket.getKey().toString()); + assertEquals(4, bucket.getDocCount()); + assertEquals(2, bucket.getDocCountError()); + } + + public void testSimpleAggErrorSingleShard() throws Exception { + assertAcked( + prepareCreate("idx_shard_error").setMapping(STRING_FIELD_NAME, "type=keyword") + .setSettings( + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + ) + ); + client().prepareIndex("idx_shard_error").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "A").endObject()).get(); + client().prepareIndex("idx_shard_error").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "A").endObject()).get(); + client().prepareIndex("idx_shard_error").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "A").endObject()).get(); + client().prepareIndex("idx_shard_error").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "A").endObject()).get(); + client().prepareIndex("idx_shard_error").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "A").endObject()).get(); + client().prepareIndex("idx_shard_error").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "A").endObject()).get(); + client().prepareIndex("idx_shard_error").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "B").endObject()).get(); + client().prepareIndex("idx_shard_error").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "B").endObject()).get(); + refresh("idx_shard_error"); + + SearchResponse response = client().prepareSearch("idx_shard_error") + .setSize(0) + .addAggregation(terms("terms").field(STRING_FIELD_NAME).showTermDocCountError(true).size(1).shardSize(2)) + .get(); + + Terms terms = response.getAggregations().get("terms"); + assertEquals(1, terms.getBuckets().size()); + assertEquals(0, terms.getDocCountError()); + + Terms.Bucket bucket = terms.getBuckets().get(0); + assertEquals("A", bucket.getKey().toString()); + assertEquals(6, bucket.getDocCount()); + assertEquals(0, bucket.getDocCountError()); + } + + public void testSliceLevelDocCountErrorSingleShard() throws Exception { + assumeTrue( + "Slice level error is not relevant to non-concurrent search cases", + internalCluster().clusterService().getClusterSettings().get(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING) + ); + + // Slices are created by sorting segments by doc count in descending order then distributing in round robin fashion. + // Creates 2 segments (and therefore 2 slices since slice_count = 2) as follows: + // 1. [A, A, A, B, B, C] + // 2. [A, B, B, B, C, C] + // Thus we expect the doc count error for A to be 2 as the nth largest bucket on slice 2 has size 2 + + assertAcked( + prepareCreate("idx_slice_error").setMapping(STRING_FIELD_NAME, "type=keyword") + .setSettings( + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + ) + ); + client().prepareIndex("idx_slice_error").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "A").endObject()).get(); + client().prepareIndex("idx_slice_error").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "A").endObject()).get(); + client().prepareIndex("idx_slice_error").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "A").endObject()).get(); + client().prepareIndex("idx_slice_error").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "B").endObject()).get(); + client().prepareIndex("idx_slice_error").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "B").endObject()).get(); + client().prepareIndex("idx_slice_error").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "C").endObject()).get(); + refresh("idx_slice_error"); + + client().prepareIndex("idx_slice_error").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "A").endObject()).get(); + client().prepareIndex("idx_slice_error").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "B").endObject()).get(); + client().prepareIndex("idx_slice_error").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "B").endObject()).get(); + client().prepareIndex("idx_slice_error").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "B").endObject()).get(); + client().prepareIndex("idx_slice_error").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "C").endObject()).get(); + client().prepareIndex("idx_slice_error").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "C").endObject()).get(); + refresh("idx_slice_error"); + + IndicesSegmentResponse segmentResponse = client().admin().indices().prepareSegments("idx_slice_error").get(); + assertEquals(2, segmentResponse.getIndices().get("idx_slice_error").getShards().get(0).getShards()[0].getSegments().size()); + + // Confirm that there is no error when shard_size == slice_size > cardinality + SearchResponse response = client().prepareSearch("idx_slice_error") + .setSize(0) + .addAggregation(terms("terms").field(STRING_FIELD_NAME).showTermDocCountError(true).size(1).shardSize(4)) + .get(); + + Terms terms = response.getAggregations().get("terms"); + assertEquals(1, terms.getBuckets().size()); + assertEquals(0, terms.getDocCountError()); + + Terms.Bucket bucket = terms.getBuckets().get(0); // Bucket "B" + assertEquals("B", bucket.getKey().toString()); + assertEquals(5, bucket.getDocCount()); + assertEquals(0, bucket.getDocCountError()); + + response = client().prepareSearch("idx_slice_error") + .setSize(0) + .addAggregation(terms("terms").field(STRING_FIELD_NAME).showTermDocCountError(true).size(2).shardSize(2)) + .get(); + + terms = response.getAggregations().get("terms"); + assertEquals(2, terms.getBuckets().size()); + assertEquals(4, terms.getDocCountError()); + + bucket = terms.getBuckets().get(0); // Bucket "B" + assertEquals("B", bucket.getKey().toString()); + assertEquals(5, bucket.getDocCount()); + assertEquals(0, bucket.getDocCountError()); + + bucket = terms.getBuckets().get(1); // Bucket "A" + assertEquals("A", bucket.getKey().toString()); + assertEquals(3, bucket.getDocCount()); + assertEquals(2, bucket.getDocCountError()); + } + + public void testSliceLevelDocCountErrorMultiShard() throws Exception { + assumeTrue( + "Slice level error is not relevant to non-concurrent search cases", + internalCluster().clusterService().getClusterSettings().get(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING) + ); + + // Size = 2, shard_size = 2 + // Shard_1 [A, A, A, A, B, B, C, C] + // slice_1 [A, A, A, B, B, C] {"A" : 3, "B" : 2} + // slice_2 [A, C] {"A" : 1, "C" : 1} + // Shard_1 buckets: {"A" : 4 - error: 0, "B" : 2 - error: 1} + // Shard_2 [A, A, B, B, B, C, C, C] + // slice_1 [A, B, B, B, C, C] {"B" : 3, "C" : 2} + // slice_2 [A, C] {"A" : 1, "C" : 1} + // Shard_2 buckets: {"B" : 3 - error: 1, "C" : 3 - error: 0} + // Overall + // {"B" : 5 - error: 2, "A" : 4 - error: 3} Agg error: 6 + + assertAcked( + prepareCreate("idx_mshard_1").setMapping(STRING_FIELD_NAME, "type=keyword") + .setSettings( + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + ) + ); + client().prepareIndex("idx_mshard_1").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "A").endObject()).get(); + client().prepareIndex("idx_mshard_1").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "A").endObject()).get(); + client().prepareIndex("idx_mshard_1").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "A").endObject()).get(); + client().prepareIndex("idx_mshard_1").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "B").endObject()).get(); + client().prepareIndex("idx_mshard_1").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "B").endObject()).get(); + client().prepareIndex("idx_mshard_1").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "C").endObject()).get(); + refresh("idx_mshard_1"); + + client().prepareIndex("idx_mshard_1").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "A").endObject()).get(); + client().prepareIndex("idx_mshard_1").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "C").endObject()).get(); + refresh("idx_mshard_1"); + + IndicesSegmentResponse segmentResponse = client().admin().indices().prepareSegments("idx_mshard_1").get(); + assertEquals(2, segmentResponse.getIndices().get("idx_mshard_1").getShards().get(0).getShards()[0].getSegments().size()); + + SearchResponse response = client().prepareSearch("idx_mshard_1") + .setSize(0) + .addAggregation(terms("terms").field(STRING_FIELD_NAME).showTermDocCountError(true).size(2).shardSize(2)) + .get(); + + Terms terms = response.getAggregations().get("terms"); + assertEquals(2, terms.getBuckets().size()); + assertEquals(3, terms.getDocCountError()); + + Terms.Bucket bucket = terms.getBuckets().get(0); + assertEquals("A", bucket.getKey().toString()); + assertEquals(4, bucket.getDocCount()); + assertEquals(0, bucket.getDocCountError()); + + bucket = terms.getBuckets().get(1); + assertEquals("B", bucket.getKey().toString()); + assertEquals(2, bucket.getDocCount()); + assertEquals(1, bucket.getDocCountError()); + + assertAcked( + prepareCreate("idx_mshard_2").setMapping(STRING_FIELD_NAME, "type=keyword") + .setSettings( + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + ) + ); + client().prepareIndex("idx_mshard_2").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "A").endObject()).get(); + client().prepareIndex("idx_mshard_2").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "B").endObject()).get(); + client().prepareIndex("idx_mshard_2").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "B").endObject()).get(); + client().prepareIndex("idx_mshard_2").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "B").endObject()).get(); + client().prepareIndex("idx_mshard_2").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "C").endObject()).get(); + client().prepareIndex("idx_mshard_2").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "C").endObject()).get(); + refresh("idx_mshard_2"); + + client().prepareIndex("idx_mshard_2").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "A").endObject()).get(); + client().prepareIndex("idx_mshard_2").setSource(jsonBuilder().startObject().field(STRING_FIELD_NAME, "C").endObject()).get(); + refresh("idx_mshard_2"); + + segmentResponse = client().admin().indices().prepareSegments("idx_mshard_2").get(); + assertEquals(2, segmentResponse.getIndices().get("idx_mshard_2").getShards().get(0).getShards()[0].getSegments().size()); + + response = client().prepareSearch("idx_mshard_2") + .setSize(0) + .addAggregation(terms("terms").field(STRING_FIELD_NAME).showTermDocCountError(true).size(2).shardSize(2)) + .get(); + + terms = response.getAggregations().get("terms"); + assertEquals(2, terms.getBuckets().size()); + assertEquals(3, terms.getDocCountError()); + + bucket = terms.getBuckets().get(0); + assertEquals("B", bucket.getKey().toString()); + assertEquals(3, bucket.getDocCount()); + assertEquals(1, bucket.getDocCountError()); + + bucket = terms.getBuckets().get(1); + assertEquals("C", bucket.getKey().toString()); + assertEquals(3, bucket.getDocCount()); + assertEquals(0, bucket.getDocCountError()); + + response = client().prepareSearch("idx_mshard_2", "idx_mshard_1") + .setSize(0) + .addAggregation(terms("terms").field(STRING_FIELD_NAME).showTermDocCountError(true).size(2).shardSize(2)) + .get(); + + terms = response.getAggregations().get("terms"); + assertEquals(2, terms.getBuckets().size()); + assertEquals(6, terms.getDocCountError()); + + bucket = terms.getBuckets().get(0); + assertEquals("B", bucket.getKey().toString()); + assertEquals(5, bucket.getDocCount()); + assertEquals(2, bucket.getDocCountError()); + + bucket = terms.getBuckets().get(1); + assertEquals("A", bucket.getKey().toString()); + assertEquals(4, bucket.getDocCount()); + assertEquals(3, bucket.getDocCountError()); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/TermsShardMinDocCountIT.java b/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/TermsShardMinDocCountIT.java index b0d8e7ea02e8f..3851b16551795 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/TermsShardMinDocCountIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/TermsShardMinDocCountIT.java @@ -88,6 +88,10 @@ private static String randomExecutionHint() { // see https://github.com/elastic/elasticsearch/issues/5998 public void testShardMinDocCountSignificantTermsTest() throws Exception { + assumeFalse( + "For concurrent segment search shard_min_doc_count is not enforced at the slice level. See https://github.com/opensearch-project/OpenSearch/issues/11847", + internalCluster().clusterService().getClusterSettings().get(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING) + ); String textMappings; if (randomBoolean()) { textMappings = "type=long"; @@ -157,6 +161,10 @@ private void addTermsDocs(String term, int numInClass, int numNotInClass, List aggregations) { private long getDocCountError(InternalTerms terms, ReduceContext reduceContext) { int size = terms.getBuckets().size(); - // doc_count_error is always computed at the coordinator based on the buckets returned by the shards. This should be 0 during the - // shard level reduce as no buckets are being pruned at this stage. - if (reduceContext.isSliceLevel() || size == 0 || size < terms.getShardSize() || isKeyOrder(terms.order)) { + if (size == 0 || size < terms.getShardSize() || isKeyOrder(terms.order)) { return 0; } else if (InternalOrder.isCountDesc(terms.order)) { if (terms.getDocCountError() > 0) { @@ -398,6 +397,12 @@ public InternalAggregation reduce(List aggregations, Reduce for (InternalAggregation aggregation : aggregations) { @SuppressWarnings("unchecked") InternalTerms terms = (InternalTerms) aggregation; + // For Concurrent Segment Search the aggregation will have a computed doc count error coming from the shards. + // We use the existence of this doc count error to determine whether or not doc count error originated from the slice level + // and if so we will maintain the doc count error for the 1 shard case at the coordinator level + if (aggregations.size() == 1 && terms.getDocCountError() > 0) { + hasSliceLevelDocCountError = true; + } if (referenceTerms == null && aggregation.getClass().equals(UnmappedTerms.class) == false) { referenceTerms = terms; } @@ -500,7 +505,11 @@ For backward compatibility, we disable the merge sort and use ({@link InternalTe if (sumDocCountError == -1) { docCountError = -1; } else { - docCountError = aggregations.size() == 1 ? 0 : sumDocCountError; + if (hasSliceLevelDocCountError) { + docCountError = sumDocCountError; + } else { + docCountError = aggregations.size() == 1 ? 0 : sumDocCountError; + } } // Shards must return buckets sorted by key, so we apply the sort here in shard level reduce @@ -512,7 +521,7 @@ For backward compatibility, we disable the merge sort and use ({@link InternalTe @Override protected B reduceBucket(List buckets, ReduceContext context) { - assert buckets.size() > 0; + assert !buckets.isEmpty(); long docCount = 0; // For the per term doc count error we add up the errors from the // shards that did not respond with the term. To do this we add up @@ -523,7 +532,7 @@ protected B reduceBucket(List buckets, ReduceContext context) { for (B bucket : buckets) { docCount += bucket.getDocCount(); if (docCountError != -1) { - if (bucket.showDocCountError() == false || bucket.getDocCountError() == -1) { + if (bucket.showDocCountError() == false) { docCountError = -1; } else { docCountError += bucket.getDocCountError(); diff --git a/server/src/main/java/org/opensearch/search/internal/SearchContext.java b/server/src/main/java/org/opensearch/search/internal/SearchContext.java index cc43f4e5d79fb..02837da64dafd 100644 --- a/server/src/main/java/org/opensearch/search/internal/SearchContext.java +++ b/server/src/main/java/org/opensearch/search/internal/SearchContext.java @@ -86,8 +86,6 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; -import static org.opensearch.search.aggregations.bucket.BucketUtils.suggestShardSideQueueSize; - /** * This class encapsulates the state needed to execute a search. It holds a reference to the * shards point in time snapshot (IndexReader / ContextIndexSearcher) and allows passing on @@ -410,11 +408,10 @@ public boolean shouldUseConcurrentSearch() { * Returns local bucket count thresholds based on concurrent segment search status */ public LocalBucketCountThresholds asLocalBucketCountThresholds(TermsAggregator.BucketCountThresholds bucketCountThresholds) { - if (shouldUseConcurrentSearch()) { - return new LocalBucketCountThresholds(0, suggestShardSideQueueSize(bucketCountThresholds.getShardSize())); - } else { - return new LocalBucketCountThresholds(bucketCountThresholds.getShardMinDocCount(), bucketCountThresholds.getShardSize()); - } + return new LocalBucketCountThresholds( + shouldUseConcurrentSearch() ? 0 : bucketCountThresholds.getShardMinDocCount(), + bucketCountThresholds.getShardSize() + ); } /**