From 695488074c9044f540ae835bae8b4620fc344d1f Mon Sep 17 00:00:00 2001 From: Jay Deng Date: Fri, 15 Mar 2024 15:51:54 -0700 Subject: [PATCH] Parallelize build agg first try --- modules/lang-painless/build.gradle | 1 + .../org/opensearch/painless/TemporaryIT.java | 217 ++++++++++++++++++ .../aggregations/bucket/CompositeAggIT.java | 59 +++-- .../bucket/TermsDocCountErrorIT.java | 3 +- .../AggregationCollectorManager.java | 40 +++- .../search/aggregations/AggregatorBase.java | 14 ++ .../BucketCollectorProcessor.java | 30 +++ ...ggCollectorManagerWithSingleCollector.java | 2 + .../CompositeAggregationFactory.java | 2 +- .../bucket/composite/CompositeAggregator.java | 12 +- .../GlobalOrdinalsStringTermsAggregator.java | 26 ++- .../opensearch/test/OpenSearchTestCase.java | 8 +- 12 files changed, 383 insertions(+), 31 deletions(-) create mode 100644 modules/lang-painless/src/internalClusterTest/java/org/opensearch/painless/TemporaryIT.java diff --git a/modules/lang-painless/build.gradle b/modules/lang-painless/build.gradle index fb51a0bb7f157..790673feec411 100644 --- a/modules/lang-painless/build.gradle +++ b/modules/lang-painless/build.gradle @@ -33,6 +33,7 @@ import com.github.jengelman.gradle.plugins.shadow.ShadowBasePlugin apply plugin: 'opensearch.validate-rest-spec' apply plugin: 'opensearch.yaml-rest-test' +apply plugin: 'opensearch.internal-cluster-test' opensearchplugin { description 'An easy, safe and fast scripting language for OpenSearch' diff --git a/modules/lang-painless/src/internalClusterTest/java/org/opensearch/painless/TemporaryIT.java b/modules/lang-painless/src/internalClusterTest/java/org/opensearch/painless/TemporaryIT.java new file mode 100644 index 0000000000000..9c1ae553e4910 --- /dev/null +++ b/modules/lang-painless/src/internalClusterTest/java/org/opensearch/painless/TemporaryIT.java @@ -0,0 +1,217 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.painless; + +import org.opensearch.action.search.SearchResponse; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.core.xcontent.MediaTypeRegistry; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.plugins.Plugin; +import org.opensearch.script.Script; +import org.opensearch.script.ScriptType; +import org.opensearch.search.SearchService; +import org.opensearch.search.aggregations.AggregationBuilder; +import org.opensearch.search.aggregations.AggregationBuilders; +import org.opensearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder; +import org.opensearch.search.sort.FieldSortBuilder; +import org.opensearch.search.sort.SortOrder; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import static org.opensearch.index.query.QueryBuilders.boolQuery; +import static org.opensearch.index.query.QueryBuilders.termQuery; +import static org.opensearch.search.aggregations.PipelineAggregatorBuilders.bucketSort; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +public class TemporaryIT extends OpenSearchIntegTestCase { + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true) + .build(); + } + + @Override + protected Collection> nodePlugins() { + return List.of(PainlessModulePlugin.class); + } + + private void createTestIndex() throws IOException { + XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().startObject() + .field("dynamic", "false") + .startObject("_meta") + .field("schema_version", 5) + .endObject() + .startObject("properties") + .startObject("anomaly_grade").field("type", "double").endObject() + .startObject("anomaly_score").field("type", "double").endObject() + .startObject("approx_anomaly_start_time").field("type", "date").field("format", "strict_date_time||epoch_millis").endObject() + .startObject("confidence").field("type", "double").endObject() + .startObject("data_end_time").field("type", "date").field("format", "strict_date_time||epoch_millis").endObject() + .startObject("data_start_time").field("type", "date").field("format", "strict_date_time||epoch_millis").endObject() + .startObject("detector_id").field("type", "keyword").endObject() + .startObject("entity").field("type", "nested") + .startObject("properties") + .startObject("name").field("type", "keyword").endObject() + .startObject("value").field("type", "keyword").endObject() + .endObject() + .endObject() + .startObject("error").field("type", "text").endObject() + .startObject("execution_end_time").field("type", "date").field("format", "strict_date_time||epoch_millis").endObject() + .startObject("execution_start_time").field("type", "date").field("format", "strict_date_time||epoch_millis").endObject() + .startObject("expected_values").field("type", "nested") + .startObject("properties") + .startObject("likelihood").field("type", "double").endObject() + .startObject("value_list").field("type", "nested") + .startObject("properties") + .startObject("data").field("type", "double").endObject() + .startObject("feature_id").field("type", "keyword").endObject() + .endObject() + .endObject() + .endObject() + .endObject() + .startObject("feature_data").field("type", "nested") + .startObject("properties") + .startObject("data").field("type", "double").endObject() + .startObject("feature_id").field("type", "keyword").endObject() + .endObject() + .endObject() + .startObject("is_anomaly").field("type", "boolean").endObject() + .startObject("model_id").field("type", "keyword").endObject() + .startObject("past_values").field("type", "nested") + .startObject("properties") + .startObject("data").field("type", "double").endObject() + .startObject("feature_id").field("type", "keyword").endObject() + .endObject() + .endObject() + .startObject("relevant_attribution").field("type", "nested") + .startObject("properties") + .startObject("data").field("type", "double").endObject() + .startObject("feature_id").field("type", "keyword").endObject() + .endObject() + .endObject() + .startObject("schema_version").field("type", "integer").endObject() + .startObject("task_id").field("type", "keyword").endObject() + .startObject("threshold").field("type", "double").endObject() + .startObject("user").field("type", "nested") + .startObject("properties") + .startObject("backend_roles").field("type", "text").startObject("fields").startObject("keyword").field("type", "keyword").endObject().endObject().endObject() + .startObject("custom_attribute_names").field("type", "text").startObject("fields").startObject("keyword").field("type", "keyword").endObject().endObject().endObject() + .startObject("name").field("type", "text").startObject("fields").startObject("keyword").field("type", "keyword").field("ignore_above", 256).endObject().endObject().endObject() + .startObject("roles").field("type", "text").startObject("fields").startObject("keyword").field("type", "keyword").endObject().endObject().endObject() + .endObject() + .endObject() + .endObject() + .endObject(); + + assertAcked( + prepareCreate("test") + .setMapping(xContentBuilder) + .setSettings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + ) + ); + } + + private void indexTestData() { + client().prepareIndex("test").setId("gRbUF").setSource("{\"detector_id\":\"VqbXro0B0N8KJjAbG28Y\",\"schema_version\":0,\"data_start_time\":5,\"data_end_time\":5,\"feature_data\":[{\"feature_id\":\"WQgvo\",\"feature_name\":\"PVhgc\",\"data\":0.9212883816892278},{\"feature_id\":\"JulWB\",\"feature_name\":\"HgOGN\",\"data\":0.27831399526601086}],\"execution_start_time\":5,\"execution_end_time\":5,\"anomaly_score\":0.5,\"anomaly_grade\":0.8,\"confidence\":0.1705822118682151,\"entity\":[{\"name\":\"ip-field\",\"value\":\"1.2.3.4\"},{\"name\":\"keyword-field\",\"value\":\"field-1\"}],\"user\":{\"name\":\"PBJzgZpg\",\"backend_roles\":[\"giOWwAZcpU\"],\"roles\":[\"all_access\"],\"custom_attribute_names\":[\"attribute=test\"],\"user_requested_tenant\":null},\"approx_anomaly_start_time\":1708035355000,\"relevant_attribution\":[{\"feature_id\":\"piyfg\",\"data\":0.7797511350635153},{\"feature_id\":\"pFhPl\",\"data\":0.680814523323366}],\"past_values\":[{\"feature_id\":\"mECeN\",\"data\":0.8577224651498027},{\"feature_id\":\"SSHho\",\"data\":0.36525036781711573}],\"expected_values\":[{\"likelihood\":0.712699398152217,\"value_list\":[{\"feature_id\":\"wOPWI\",\"data\":0.09344528571943234},{\"feature_id\":\"HMZbM\",\"data\":0.8899196238445849}]}],\"threshold\":7.513042281539716}", MediaTypeRegistry.JSON).get(); + client().prepareIndex("test").setId("vWCJa").setSource("{\"detector_id\":\"VqbXro0B0N8KJjAbG28Y\",\"schema_version\":0,\"data_start_time\":5,\"data_end_time\":5,\"feature_data\":[{\"feature_id\":\"Lmcsm\",\"feature_name\":\"iDXfc\",\"data\":0.9674434291471465},{\"feature_id\":\"qSUQl\",\"feature_name\":\"qbEoF\",\"data\":0.6504223878706881}],\"execution_start_time\":5,\"execution_end_time\":5,\"anomaly_score\":0.5,\"anomaly_grade\":0.5,\"confidence\":0.06614591879270315,\"entity\":[{\"name\":\"ip-field\",\"value\":\"5.6.7.8\"},{\"name\":\"keyword-field\",\"value\":\"field-2\"}],\"user\":{\"name\":\"dJHBbnuu\",\"backend_roles\":[\"HXqCilWVMf\"],\"roles\":[\"all_access\"],\"custom_attribute_names\":[\"attribute=test\"],\"user_requested_tenant\":null},\"approx_anomaly_start_time\":1708035355000,\"relevant_attribution\":[{\"feature_id\":\"Ufhtc\",\"data\":0.08750171412108843},{\"feature_id\":\"uyJWb\",\"data\":0.9333680688095377}],\"past_values\":[{\"feature_id\":\"qskfI\",\"data\":0.970802420410941},{\"feature_id\":\"gYdme\",\"data\":0.847333030542884}],\"expected_values\":[{\"likelihood\":0.001994250912530804,\"value_list\":[{\"feature_id\":\"pnLad\",\"data\":0.1614332721050905},{\"feature_id\":\"BtBBh\",\"data\":0.5734485976838636}]}],\"threshold\":8.580216939299472}", MediaTypeRegistry.JSON).get(); + client().prepareIndex("test").setId("VnVkC").setSource("{\"detector_id\":\"VqbXro0B0N8KJjAbG28Y\",\"schema_version\":0,\"data_start_time\":5,\"data_end_time\":5,\"feature_data\":[{\"feature_id\":\"IqHwm\",\"feature_name\":\"LCnRh\",\"data\":0.8929177514663842},{\"feature_id\":\"IcaxA\",\"feature_name\":\"HLuxV\",\"data\":0.8975549333747292}],\"execution_start_time\":5,\"execution_end_time\":5,\"anomaly_score\":0.5,\"anomaly_grade\":0.2,\"confidence\":0.06244189871920458,\"entity\":[{\"name\":\"ip-field\",\"value\":\"5.6.7.8\"},{\"name\":\"keyword-field\",\"value\":\"field-2\"}],\"user\":{\"name\":\"IBhQUsrP\",\"backend_roles\":[\"AeewVXqCYO\"],\"roles\":[\"all_access\"],\"custom_attribute_names\":[\"attribute=test\"],\"user_requested_tenant\":null},\"approx_anomaly_start_time\":1708035355000,\"relevant_attribution\":[{\"feature_id\":\"EptJC\",\"data\":0.6875058309428451},{\"feature_id\":\"IKFpg\",\"data\":0.3419015294070341}],\"past_values\":[{\"feature_id\":\"KnVpN\",\"data\":0.7255993126008243},{\"feature_id\":\"NxgkL\",\"data\":0.6884725049479412}],\"expected_values\":[{\"likelihood\":0.7352436055910023,\"value_list\":[{\"feature_id\":\"Cvddb\",\"data\":0.7457298326060673},{\"feature_id\":\"QhtZU\",\"data\":0.7327525344956058}]}],\"threshold\":6.517648854225251}", MediaTypeRegistry.JSON).get(); + refresh("test"); + } + + public void test() throws Exception { + createTestIndex(); + indexTestData(); + + /** + * curl "localhost:57523/.opendistro-anomaly-results/_search?pretty" -H 'Content-Type: application/json' -d' + * quote> { + * "query": { + * "bool": { + * "filter": { + * "term": { + * "detector_id": "Ue39ro0BJngQavFLX2Q-" + * } + * } + * } + * }, + * "aggs": { + * "multi_buckets": { + * "composite": { + * "sources": [{ + * "keyword-field": { + * "terms": { + * "script": { + * "source": "String value = null; if (params == null || params._source == null || params._source.entity == null) { return \"\"; } for (item in params._source.entity) { if (item[\"name\"] == \"keyword-field\") { value = item['value']; break; } } return value;", + * "lang": "painless" + * } + * } + * } + * }] + * }, + * "aggregations": { + * "max": { + * "max": { + * "field": "anomaly_grade" + * } + * }, + * "multi_buckets_sort": { + * "bucket_sort": { + * "sort": [{ + * "max": { + * "order": "desc" + * } + * }], + * "size": 10 + * } + * } + * } + * } + * } + * }' + */ + + QueryBuilder query = boolQuery().filter( + termQuery("detector_id", "VqbXro0B0N8KJjAbG28Y") + ); + + AggregationBuilder agg = AggregationBuilders.composite("multi_buckets", + Collections.singletonList(new TermsValuesSourceBuilder("keyword-field") + .script(new Script(ScriptType.INLINE, "painless", "String value = null; if (params == null || params._source == null || params._source.entity == null) { return \"\"; } for (item in params._source.entity) { if (item[\"name\"] == \"keyword-field\") { value = item['value']; break; } } return value;", Collections.emptyMap())) + ) + ).subAggregation( + AggregationBuilders.max("max").field("anomaly_grade") + ).subAggregation( + bucketSort("multi_buckets_sort", Collections.singletonList(new FieldSortBuilder("max").order(SortOrder.DESC))) + .size(10) + ); + + + System.out.println(query); + System.out.println(agg); + + SearchResponse response = client().prepareSearch("test") + .setQuery(query) + .addAggregation(agg) + .get(); + + System.out.println(response); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/CompositeAggIT.java b/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/CompositeAggIT.java index 5a38ba670f1dc..1cd1001d0b203 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/CompositeAggIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/CompositeAggIT.java @@ -10,6 +10,8 @@ import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; +import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse; +import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse; import org.opensearch.action.search.SearchResponse; import org.opensearch.cluster.health.ClusterHealthStatus; import org.opensearch.cluster.metadata.IndexMetadata; @@ -26,6 +28,8 @@ import java.util.Collection; import java.util.List; +import static java.lang.Thread.sleep; +import static org.opensearch.indices.IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING; import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse; @@ -40,7 +44,7 @@ public CompositeAggIT(Settings staticSettings) { @ParametersFactory public static Collection parameters() { return Arrays.asList( - new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() }, + new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() }, new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() } ); } @@ -50,26 +54,50 @@ public void setupSuiteScopeCluster() throws Exception { assertAcked( prepareCreate( "idx", - Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), false) ).setMapping("type", "type=keyword", "num", "type=integer", "score", "type=integer") ); waitForRelocation(ClusterHealthStatus.GREEN); - client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "1", "score", "5").get(); - client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "11", "score", "50").get(); - refresh("idx"); - client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "1", "score", "2").get(); - client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "12", "score", "20").get(); - refresh("idx"); - client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "3", "score", "10").get(); - client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "13", "score", "15").get(); - refresh("idx"); - client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "3", "score", "1").get(); - client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "13", "score", "100").get(); - refresh("idx"); + indexRandom( + true, + client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "1", "score", "5"), + client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "11", "score", "50"), + client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "1", "score", "2"), + client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "12", "score", "20"), + client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "3", "score", "10"), + client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "13", "score", "15"), + client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "3", "score", "1"), + client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "13", "score", "100") + ); + + forceMerge(1); + sleep(1000); + + // client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "1", "score", "5").get(); + // client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "11", "score", "50").get(); + // refresh("idx"); + // client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "1", "score", "2").get(); + // client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "12", "score", "20").get(); + // refresh("idx"); + // client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "3", "score", "10").get(); + // client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "13", "score", "15").get(); + // refresh("idx"); + // client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "3", "score", "1").get(); + // client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "13", "score", "100").get(); + // refresh("idx"); waitForRelocation(ClusterHealthStatus.GREEN); - refresh(); + // refresh(); + + IndicesSegmentResponse segmentResponse = client().admin().indices().prepareSegments("idx").get(); + System.out.println("Segments: " + segmentResponse.getIndices().get("idx").getShards().get(0).getShards()[0].getSegments().size()); + + GetSettingsResponse settingsResponse = client().admin().indices().prepareGetSettings("idx").get(); + System.out.println("Settings: " + settingsResponse); } public void testCompositeAggWithNoSubAgg() { @@ -88,6 +116,7 @@ public void testCompositeAggWithSubAgg() { ) .get(); assertSearchResponse(rsp); + System.out.println(rsp); } private List> getTestValueSources() { diff --git a/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/TermsDocCountErrorIT.java b/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/TermsDocCountErrorIT.java index add6b71cb1753..383ce5f7973b8 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/TermsDocCountErrorIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/TermsDocCountErrorIT.java @@ -73,7 +73,8 @@ public class TermsDocCountErrorIT extends ParameterizedStaticSettingsOpenSearchI private static final String DOUBLE_FIELD_NAME = "d_value"; public static String randomExecutionHint() { - return randomBoolean() ? null : randomFrom(ExecutionMode.values()).toString(); +// return randomBoolean() ? null : randomFrom(ExecutionMode.values()).toString(); + return "global_ordinals"; } private static int numRoutingValues; diff --git a/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java b/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java index 0bb2d1d7ca933..705287c401775 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java +++ b/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java @@ -18,6 +18,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Objects; /** * Common {@link CollectorManager} used by both concurrent and non-concurrent aggregation path and also for global and non-global @@ -54,19 +55,40 @@ public String getCollectorReason() { public abstract String getCollectorName(); +// @Override +// public ReduceableSearchResult reduce(Collection collectors) throws IOException { +// final List aggregators = context.bucketCollectorProcessor().toAggregators(collectors); +// final List internals = new ArrayList<>(aggregators.size()); +// context.aggregations().resetBucketMultiConsumer(); +// for (Aggregator aggregator : aggregators) { +// try { +// // post collection is called in ContextIndexSearcher after search on leaves are completed +// internals.add(aggregator.buildTopLevel()); +// } catch (IOException e) { +// throw new AggregationExecutionException("Failed to build aggregation [" + aggregator.name() + "]", e); +// } +// } +// +// final InternalAggregations internalAggregations = InternalAggregations.from(internals); +// return buildAggregationResult(internalAggregations); +// } + @Override public ReduceableSearchResult reduce(Collection collectors) throws IOException { - final List aggregators = context.bucketCollectorProcessor().toAggregators(collectors); - final List internals = new ArrayList<>(aggregators.size()); - context.aggregations().resetBucketMultiConsumer(); - for (Aggregator aggregator : aggregators) { - try { - // post collection is called in ContextIndexSearcher after search on leaves are completed - internals.add(aggregator.buildTopLevel()); - } catch (IOException e) { - throw new AggregationExecutionException("Failed to build aggregation [" + aggregator.name() + "]", e); + List internals = context.bucketCollectorProcessor().toInternalAggregations(collectors); + + // collect does not get called whenever there are no leaves on a shard. Since we build the InternalAggregation in postCollection now, + // that will not get called in such cases. Therefore we need to manually call it again here to build empty Internal Aggregation objects for this collector tree. + if (internals.stream().allMatch(Objects::isNull)) { + for (Collector c : collectors) { + if (c instanceof AggregatorBase) { + ((AggregatorBase) c ).buildAndSetInternalAggregation(); + } } + internals = context.bucketCollectorProcessor().toInternalAggregations(collectors); } + // TODO: Assert nothing in internals is null + context.aggregations().resetBucketMultiConsumer(); // Not sure if this is thread safe final InternalAggregations internalAggregations = InternalAggregations.from(internals); return buildAggregationResult(internalAggregations); diff --git a/server/src/main/java/org/opensearch/search/aggregations/AggregatorBase.java b/server/src/main/java/org/opensearch/search/aggregations/AggregatorBase.java index 47e9def094623..71f336ba9746c 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/AggregatorBase.java +++ b/server/src/main/java/org/opensearch/search/aggregations/AggregatorBase.java @@ -34,6 +34,7 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.ScoreMode; +import org.opensearch.common.SetOnce; import org.opensearch.core.common.breaker.CircuitBreaker; import org.opensearch.core.common.breaker.CircuitBreakingException; import org.opensearch.core.indices.breaker.CircuitBreakerService; @@ -72,6 +73,8 @@ public abstract class AggregatorBase extends Aggregator { private final CircuitBreakerService breakerService; private long requestBytesUsed; + private final SetOnce internalAggregation = new SetOnce<>(); + /** * Constructs a new Aggregator. * @@ -279,6 +282,13 @@ public void postCollection() throws IOException { collectableSubAggregators.postCollection(); } + public void buildAndSetInternalAggregation() throws IOException { + // Only call buildTopLevel for top level aggregators. This will subsequently build aggregations for child aggs. + if (parent == null) { + internalAggregation.set(buildTopLevel()); + } + } + /** Called upon release of the aggregator. */ @Override public void close() { @@ -305,6 +315,10 @@ protected final InternalAggregations buildEmptySubAggregations() { return InternalAggregations.from(aggs); } + public InternalAggregation getInternalAggregation() { + return internalAggregation.get(); + } + @Override public String toString() { return name; diff --git a/server/src/main/java/org/opensearch/search/aggregations/BucketCollectorProcessor.java b/server/src/main/java/org/opensearch/search/aggregations/BucketCollectorProcessor.java index 135fda71a757a..f259b180efe22 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/BucketCollectorProcessor.java +++ b/server/src/main/java/org/opensearch/search/aggregations/BucketCollectorProcessor.java @@ -72,6 +72,11 @@ public void processPostCollection(Collector collectorTree) throws IOException { } } else if (currentCollector instanceof BucketCollector) { ((BucketCollector) currentCollector).postCollection(); + + // Call buildTopLevel here + if (currentCollector instanceof AggregatorBase) { + ((AggregatorBase) currentCollector).buildAndSetInternalAggregation(); + } } } } @@ -106,4 +111,29 @@ public List toAggregators(Collection collectors) { } return aggregators; } + + public List toInternalAggregations(Collection collectors) { + List internalAggregations = new ArrayList<>(); + + final Deque allCollectors = new LinkedList<>(collectors); + while (!allCollectors.isEmpty()) { + final Collector currentCollector = allCollectors.pop(); + if (currentCollector instanceof AggregatorBase) { + internalAggregations.add(((AggregatorBase) currentCollector).getInternalAggregation()); + } else if (currentCollector instanceof InternalProfileCollector) { + if (((InternalProfileCollector) currentCollector).getCollector() instanceof Aggregator) { + internalAggregations.add( + ((AggregatorBase) ((InternalProfileCollector) currentCollector).getCollector()).getInternalAggregation() + ); + } else if (((InternalProfileCollector) currentCollector).getCollector() instanceof MultiBucketCollector) { + allCollectors.addAll( + Arrays.asList(((MultiBucketCollector) ((InternalProfileCollector) currentCollector).getCollector()).getCollectors()) + ); + } + } else if (currentCollector instanceof MultiBucketCollector) { + allCollectors.addAll(Arrays.asList(((MultiBucketCollector) currentCollector).getCollectors())); + } + } + return internalAggregations; + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/NonGlobalAggCollectorManagerWithSingleCollector.java b/server/src/main/java/org/opensearch/search/aggregations/NonGlobalAggCollectorManagerWithSingleCollector.java index a6eb00f2d70f7..64fcdb8c50477 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/NonGlobalAggCollectorManagerWithSingleCollector.java +++ b/server/src/main/java/org/opensearch/search/aggregations/NonGlobalAggCollectorManagerWithSingleCollector.java @@ -39,9 +39,11 @@ public Collector newCollector() throws IOException { return collector; } + // Can we add post collection logic in here? @Override public ReduceableSearchResult reduce(Collection collectors) throws IOException { assert collectors.isEmpty() : "Reduce on NonGlobalAggregationCollectorManagerWithCollector called with non-empty collectors"; + return super.reduce(List.of(collector)); } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregationFactory.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregationFactory.java index 4af14ab014db5..6dcdf9b98c7f4 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregationFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregationFactory.java @@ -81,6 +81,6 @@ protected Aggregator createInternal( @Override protected boolean supportsConcurrentSegmentSearch() { // See https://github.com/opensearch-project/OpenSearch/issues/12331 for details - return false; + return true; } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java index b97c814cdf645..3709508c6a03b 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java @@ -107,7 +107,7 @@ final class CompositeAggregator extends BucketsAggregator { private final CompositeValuesSourceConfig[] sourceConfigs; private final SingleDimensionValuesSource[] sources; - private final CompositeValuesCollectorQueue queue; + private CompositeValuesCollectorQueue queue; private final List entries = new ArrayList<>(); private LeafReaderContext currentLeaf; @@ -236,6 +236,16 @@ protected void doPreCollection() throws IOException { @Override protected void doPostCollection() throws IOException { finishLeaf(); + // Re-create the ValuesSource on the search thread for concurrent search + // for (int i = 0; i < sourceConfigs.length; i++) { + // this.sources[i] = sourceConfigs[i].createValuesSource( + // context.bigArrays(), + // context.searcher().getIndexReader(), + // size, + // this::addRequestCircuitBreakerBytes + // ); + // } + // this.queue = new CompositeValuesCollectorQueue(context.bigArrays(), sources, size, rawAfterKey); } @Override diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java index 5ed899408ab40..69ab2ee616299 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java @@ -90,6 +90,8 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr protected int segmentsWithSingleValuedOrds = 0; protected int segmentsWithMultiValuedOrds = 0; + protected final IndexReader reader; + /** * Lookup global ordinals * @@ -119,7 +121,9 @@ public GlobalOrdinalsStringTermsAggregator( super(name, factories, context, parent, order, format, bucketCountThresholds, collectionMode, showTermDocCountError, metadata); this.resultStrategy = resultStrategy.apply(this); // ResultStrategy needs a reference to the Aggregator to do its job. this.valuesSource = valuesSource; - final IndexReader reader = context.searcher().getIndexReader(); + reader = context.searcher().getIndexReader(); + // valuesSource is shared across aggregators and the DocValues here are created when the collector is created. + // Need to delay this creation to when it's actually used in the index_search thread. final SortedSetDocValues values = reader.leaves().size() > 0 ? valuesSource.globalOrdinalsValues(context.searcher().getIndexReader().leaves().get(0)) : DocValues.emptySortedSet(); @@ -142,6 +146,8 @@ String descriptCollectionStrategy() { return collectionStrategy.describe(); } +// public + @Override public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { SortedSetDocValues globalOrds = valuesSource.globalOrdinalsValues(ctx); @@ -776,7 +782,15 @@ PriorityQueue buildPriorityQueue(int size) { } StringTerms.Bucket convertTempBucketToRealBucket(OrdBucket temp) throws IOException { - BytesRef term = BytesRef.deepCopyOf(lookupGlobalOrd.apply(temp.globalOrd)); +// BytesRef term = BytesRef.deepCopyOf(lookupGlobalOrd.apply(temp.globalOrd)); + SortedSetDocValues values = reader.leaves().size() > 0 + ? valuesSource.globalOrdinalsValues(context.searcher().getIndexReader().leaves().get(0)) + : DocValues.emptySortedSet(); + BytesRef term = BytesRef.deepCopyOf( + values.lookupOrd(temp.globalOrd) + ); + + StringTerms.Bucket result = new StringTerms.Bucket(term, temp.docCount, null, showTermDocCountError, 0, format); result.bucketOrd = temp.bucketOrd; result.docCountError = 0; @@ -892,7 +906,13 @@ BucketUpdater bucketUpdater(long owningBucketOrd) long subsetSize = subsetSize(owningBucketOrd); return (spare, globalOrd, bucketOrd, docCount) -> { spare.bucketOrd = bucketOrd; - oversizedCopy(lookupGlobalOrd.apply(globalOrd), spare.termBytes); +// oversizedCopy(lookupGlobalOrd.apply(globalOrd), spare.termBytes); + SortedSetDocValues values = reader.leaves().size() > 0 + ? valuesSource.globalOrdinalsValues(context.searcher().getIndexReader().leaves().get(0)) + : DocValues.emptySortedSet(); + oversizedCopy( + values.lookupOrd(globalOrd), + spare.termBytes); spare.subsetDf = docCount; spare.subsetSize = subsetSize; spare.supersetDf = backgroundFrequencies.freq(spare.termBytes); diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java index aac3fca9e1e16..20abd04f2129a 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java @@ -211,7 +211,13 @@ "LuceneFixedGap", "LuceneVarGapFixedInterval", "LuceneVarGapDocFreqInterval", - "Lucene50" }) + "Lucene50", + "Lucene90", + "Lucene94", + "Lucene90", + "Lucene95", + "Lucene99" +}) @LuceneTestCase.SuppressReproduceLine public abstract class OpenSearchTestCase extends LuceneTestCase {