From 4fad4702e4f986f93eda88134c49225625cc21ac Mon Sep 17 00:00:00 2001 From: Jay Deng Date: Fri, 2 Aug 2024 15:58:36 -0700 Subject: [PATCH] Support scripting for composite aggs in concurrent segment search (#15072) (#15094) Signed-off-by: Jay Deng --- CHANGELOG.md | 1 + modules/lang-painless/build.gradle | 1 + .../opensearch/painless/SimplePainlessIT.java | 231 ++++++++++++++++++ .../CompositeAggregationFactory.java | 4 +- .../search/lookup/SearchLookup.java | 13 +- .../search/lookup/SourceLookup.java | 2 +- 6 files changed, 247 insertions(+), 5 deletions(-) create mode 100644 modules/lang-painless/src/internalClusterTest/java/org/opensearch/painless/SimplePainlessIT.java diff --git a/CHANGELOG.md b/CHANGELOG.md index d54722a141959..2dec559228113 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add basic aggregation support for derived fields ([#14618](https://github.com/opensearch-project/OpenSearch/pull/14618)) - Add ThreadContextPermission for markAsSystemContext and allow core to perform the method ([#15016](https://github.com/opensearch-project/OpenSearch/pull/15016)) - Add ThreadContextPermission for stashAndMergeHeaders and stashWithOrigin ([#15039](https://github.com/opensearch-project/OpenSearch/pull/15039)) +- [Concurrent Segment Search] Support composite aggregations with scripting ([#15072](https://github.com/opensearch-project/OpenSearch/pull/15072)) - Add `rangeQuery` and `regexpQuery` for `constant_keyword` field type ([#14711](https://github.com/opensearch-project/OpenSearch/pull/14711)) ### Dependencies diff --git a/modules/lang-painless/build.gradle b/modules/lang-painless/build.gradle index 71e922392e39d..7cc4cec24a975 100644 --- a/modules/lang-painless/build.gradle +++ b/modules/lang-painless/build.gradle @@ -33,6 +33,7 @@ import com.github.jengelman.gradle.plugins.shadow.ShadowBasePlugin apply plugin: 'opensearch.validate-rest-spec' apply plugin: 'opensearch.yaml-rest-test' +apply plugin: 'opensearch.internal-cluster-test' opensearchplugin { description 'An easy, safe and fast scripting language for OpenSearch' diff --git a/modules/lang-painless/src/internalClusterTest/java/org/opensearch/painless/SimplePainlessIT.java b/modules/lang-painless/src/internalClusterTest/java/org/opensearch/painless/SimplePainlessIT.java new file mode 100644 index 0000000000000..7c3caa2b9cb20 --- /dev/null +++ b/modules/lang-painless/src/internalClusterTest/java/org/opensearch/painless/SimplePainlessIT.java @@ -0,0 +1,231 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.painless; + +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; + +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.support.WriteRequest; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.core.xcontent.MediaTypeRegistry; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.index.query.TermsQueryBuilder; +import org.opensearch.plugins.Plugin; +import org.opensearch.script.Script; +import org.opensearch.script.ScriptType; +import org.opensearch.search.aggregations.AggregationBuilder; +import org.opensearch.search.aggregations.AggregationBuilders; +import org.opensearch.search.aggregations.bucket.composite.InternalComposite; +import org.opensearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder; +import org.opensearch.search.aggregations.bucket.terms.Terms; +import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +import static org.opensearch.index.query.QueryBuilders.matchAllQuery; +import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING; +import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse; + +@OpenSearchIntegTestCase.SuiteScopeTestCase +public class SimplePainlessIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase { + + public SimplePainlessIT(Settings nodeSettings) { + super(nodeSettings); + } + + @ParametersFactory + public static Collection parameters() { + return Arrays.asList( + new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() }, + new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() } + ); + } + + @Override + protected Collection> nodePlugins() { + return List.of(PainlessPlugin.class); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING.getKey(), "4") + .build(); + } + + @Override + public void setupSuiteScopeCluster() throws Exception { + XContentBuilder xContentBuilder = XContentFactory.jsonBuilder() + .startObject() + .field("dynamic", "false") + .startObject("_meta") + .field("schema_version", 5) + .endObject() + .startObject("properties") + .startObject("entity") + .field("type", "nested") + .endObject() + .endObject() + .endObject(); + + assertAcked( + prepareCreate("test").setMapping(xContentBuilder) + .setSettings( + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + ) + ); + + assertAcked( + prepareCreate("test-df").setSettings( + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + ) + ); + + client().prepareIndex("test") + .setId("a") + .setSource( + "{\"entity\":[{\"name\":\"ip-field\",\"value\":\"1.2.3.4\"},{\"name\":\"keyword-field\",\"value\":\"field-1\"}]}", + MediaTypeRegistry.JSON + ) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get(); + client().prepareIndex("test") + .setId("b") + .setSource( + "{\"entity\":[{\"name\":\"ip-field\",\"value\":\"5.6.7.8\"},{\"name\":\"keyword-field\",\"value\":\"field-2\"}]}", + MediaTypeRegistry.JSON + ) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get(); + client().prepareIndex("test") + .setId("c") + .setSource( + "{\"entity\":[{\"name\":\"ip-field\",\"value\":\"1.6.3.8\"},{\"name\":\"keyword-field\",\"value\":\"field-2\"}]}", + MediaTypeRegistry.JSON + ) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get(); + client().prepareIndex("test") + .setId("d") + .setSource( + "{\"entity\":[{\"name\":\"ip-field\",\"value\":\"2.6.4.8\"},{\"name\":\"keyword-field\",\"value\":\"field-2\"}]}", + MediaTypeRegistry.JSON + ) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get(); + ensureSearchable("test"); + + client().prepareIndex("test-df") + .setId("a") + .setSource("{\"field\":\"value1\"}", MediaTypeRegistry.JSON) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get(); + client().prepareIndex("test-df") + .setId("b") + .setSource("{\"field\":\"value2\"}", MediaTypeRegistry.JSON) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get(); + client().prepareIndex("test-df") + .setId("c") + .setSource("{\"field\":\"value3\"}", MediaTypeRegistry.JSON) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get(); + client().prepareIndex("test-df") + .setId("d") + .setSource("{\"field\":\"value1\"}", MediaTypeRegistry.JSON) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get(); + ensureSearchable("test"); + } + + public void testTermsValuesSource() throws Exception { + AggregationBuilder agg = AggregationBuilders.composite( + "multi_buckets", + Collections.singletonList( + new TermsValuesSourceBuilder("keyword-field").script( + new Script( + ScriptType.INLINE, + "painless", + "String value = null; if (params == null || params._source == null || params._source.entity == null) { return \"\"; } for (item in params._source.entity) { if (item[\"name\"] == \"keyword-field\") { value = item['value']; break; } } return value;", + Collections.emptyMap() + ) + ) + ) + ); + SearchResponse response = client().prepareSearch("test").setQuery(matchAllQuery()).addAggregation(agg).get(); + + assertSearchResponse(response); + assertEquals(2, ((InternalComposite) response.getAggregations().get("multi_buckets")).getBuckets().size()); + assertEquals( + "field-1", + ((InternalComposite) response.getAggregations().get("multi_buckets")).getBuckets().get(0).getKey().get("keyword-field") + ); + assertEquals(1, ((InternalComposite) response.getAggregations().get("multi_buckets")).getBuckets().get(0).getDocCount()); + assertEquals( + "field-2", + ((InternalComposite) response.getAggregations().get("multi_buckets")).getBuckets().get(1).getKey().get("keyword-field") + ); + assertEquals(3, ((InternalComposite) response.getAggregations().get("multi_buckets")).getBuckets().get(1).getDocCount()); + } + + public void testSimpleDerivedFieldsQuery() { + assumeFalse( + "Derived fields do not support concurrent search https://github.com/opensearch-project/OpenSearch/issues/15007", + internalCluster().clusterService().getClusterSettings().get(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING) + ); + SearchRequest searchRequest = new SearchRequest("test-df").source( + SearchSourceBuilder.searchSource() + .derivedField("result", "keyword", new Script("emit(params._source[\"field\"])")) + .fetchField("result") + .query(new TermsQueryBuilder("result", "value1")) + ); + SearchResponse response = client().search(searchRequest).actionGet(); + assertSearchResponse(response); + assertEquals(2, Objects.requireNonNull(response.getHits().getTotalHits()).value); + } + + public void testSimpleDerivedFieldsAgg() { + assumeFalse( + "Derived fields do not support concurrent search https://github.com/opensearch-project/OpenSearch/issues/15007", + internalCluster().clusterService().getClusterSettings().get(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING) + ); + SearchRequest searchRequest = new SearchRequest("test-df").source( + SearchSourceBuilder.searchSource() + .derivedField("result", "keyword", new Script("emit(params._source[\"field\"])")) + .fetchField("result") + .aggregation(new TermsAggregationBuilder("derived-agg").field("result")) + ); + SearchResponse response = client().search(searchRequest).actionGet(); + assertSearchResponse(response); + Terms aggResponse = response.getAggregations().get("derived-agg"); + assertEquals(3, aggResponse.getBuckets().size()); + Terms.Bucket bucket = aggResponse.getBuckets().get(0); + assertEquals("value1", bucket.getKey()); + assertEquals(2, bucket.getDocCount()); + bucket = aggResponse.getBuckets().get(1); + assertEquals("value2", bucket.getKey()); + assertEquals(1, bucket.getDocCount()); + bucket = aggResponse.getBuckets().get(2); + assertEquals("value3", bucket.getKey()); + assertEquals(1, bucket.getDocCount()); + } +} diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregationFactory.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregationFactory.java index 6c5619a843fae..2ff79fb623def 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregationFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregationFactory.java @@ -40,7 +40,6 @@ import org.opensearch.search.internal.SearchContext; import java.io.IOException; -import java.util.Arrays; import java.util.Map; /** @@ -81,7 +80,6 @@ protected Aggregator createInternal( @Override protected boolean supportsConcurrentSegmentSearch() { - // Disable concurrent search if any scripting is used. See https://github.com/opensearch-project/OpenSearch/issues/12331 for details - return Arrays.stream(sources).noneMatch(CompositeValuesSourceConfig::hasScript); + return true; } } diff --git a/server/src/main/java/org/opensearch/search/lookup/SearchLookup.java b/server/src/main/java/org/opensearch/search/lookup/SearchLookup.java index 906616eb9ba5f..dff8fae1a9ad1 100644 --- a/server/src/main/java/org/opensearch/search/lookup/SearchLookup.java +++ b/server/src/main/java/org/opensearch/search/lookup/SearchLookup.java @@ -153,14 +153,25 @@ public final SearchLookup forkAndTrackFieldReferences(String field) { return new SearchLookup(this, newFieldChain); } + /** + * SourceLookup is not thread safe, so we create a new instance for each leaf to support concurrent segment search + */ public LeafSearchLookup getLeafSearchLookup(LeafReaderContext context) { - return new LeafSearchLookup(context, docMap.getLeafDocLookup(context), sourceLookup, fieldsLookup.getLeafFieldsLookup(context)); + return new LeafSearchLookup( + context, + docMap.getLeafDocLookup(context), + new SourceLookup(), + fieldsLookup.getLeafFieldsLookup(context) + ); } public DocLookup doc() { return docMap; } + /** + * Returned SourceLookup will be unrelated to any created LeafSearchLookups. Instead, use {@link LeafSearchLookup#source()} to access the related {@link SearchLookup}. + */ public SourceLookup source() { return sourceLookup; } diff --git a/server/src/main/java/org/opensearch/search/lookup/SourceLookup.java b/server/src/main/java/org/opensearch/search/lookup/SourceLookup.java index cbac29fde7932..4644bcb3d9b92 100644 --- a/server/src/main/java/org/opensearch/search/lookup/SourceLookup.java +++ b/server/src/main/java/org/opensearch/search/lookup/SourceLookup.java @@ -57,7 +57,7 @@ import static java.util.Collections.emptyMap; /** - * Orchestrator class for source lookups + * Orchestrator class for source lookups. Not thread safe. * * @opensearch.api */