diff --git a/.github/workflows/test_aggregations.yml b/.github/workflows/test_aggregations.yml new file mode 100644 index 000000000..3bd5a66c0 --- /dev/null +++ b/.github/workflows/test_aggregations.yml @@ -0,0 +1,71 @@ +name: Run Additional Tests for Neural Search +on: + schedule: + - cron: '0 0 * * *' # every night + push: + branches: + - "*" + - "feature/**" + pull_request: + branches: + - "*" + - "feature/**" + +jobs: + Get-CI-Image-Tag: + uses: opensearch-project/opensearch-build/.github/workflows/get-ci-image-tag.yml@main + with: + product: opensearch + + Check-neural-search-linux: + needs: Get-CI-Image-Tag + strategy: + matrix: + java: [11, 17, 21] + os: [ubuntu-latest] + + name: Integ Tests Linux + runs-on: ${{ matrix.os }} + container: + # using the same image which is used by opensearch-build team to build the OpenSearch Distribution + # this image tag is subject to change as more dependencies and updates will arrive over time + image: ${{ needs.Get-CI-Image-Tag.outputs.ci-image-version-linux }} + # need to switch to root so that github actions can install runner binary on container without permission issues. + options: --user root + + + steps: + - name: Checkout neural-search + uses: actions/checkout@v1 + + - name: Setup Java ${{ matrix.java }} + uses: actions/setup-java@v1 + with: + java-version: ${{ matrix.java }} + + - name: Run tests + run: | + chown -R 1000:1000 `pwd` + su `id -un 1000` -c "./gradlew ':integTest' -Dtest_aggs=true --tests \"org.opensearch.neuralsearch.query.aggregation.*IT\"" + + Check-neural-search-windows: + strategy: + matrix: + java: [11, 17, 21] + os: [windows-latest] + + name: Integ Tests Windows + runs-on: ${{ matrix.os }} + + steps: + - name: Checkout neural-search + uses: actions/checkout@v1 + + - name: Setup Java ${{ matrix.java }} + uses: actions/setup-java@v1 + with: + java-version: ${{ matrix.java }} + + - name: Run tests + run: | + ./gradlew ':integTest' -Dtest_aggs=true --tests "org.opensearch.neuralsearch.query.aggregation.*IT" diff --git a/CHANGELOG.md b/CHANGELOG.md index 57a2f7a4d..e52c15c77 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix typo for sparse encoding processor factory([#578](https://github.com/opensearch-project/neural-search/pull/578)) - Add non-null check for queryBuilder in NeuralQueryEnricherProcessor ([#615](https://github.com/opensearch-project/neural-search/pull/615)) ### Infrastructure +- Adding integ tests for aggregations when they're bundled with hybrid query ([#632](https://github.com/opensearch-project/neural-search/pull/632)) ### Documentation ### Maintenance ### Refactoring diff --git a/DEVELOPER_GUIDE.md b/DEVELOPER_GUIDE.md index acbb00883..47ae31be6 100644 --- a/DEVELOPER_GUIDE.md +++ b/DEVELOPER_GUIDE.md @@ -181,6 +181,11 @@ Additionally, to run integration tests on multi nodes with security enabled, run ./gradlew :integTest -Dsecurity.enabled=true -PnumNodes=3 ``` +Some integration tests are skipped by default, mainly to save time and resources. A special parameter is required to include those tests in the executed test suite. For example, the following command enables additional tests for aggregations when they are bundled with hybrid queries +``` +./gradlew :integTest -PnumNodes=3 -Dtest_aggs=true +``` + Integration tests can be run with remote cluster. For that run the following command and replace host/port/cluster name values with ones for the target cluster: ``` diff --git a/build.gradle b/build.gradle index 1bf27837c..906e21317 100644 --- a/build.gradle +++ b/build.gradle @@ -305,6 +305,12 @@ task integTest(type: RestIntegTestTask) { description = "Run tests against a cluster" testClassesDirs = sourceSets.test.output.classesDirs classpath = sourceSets.test.runtimeClasspath + boolean runCompleteAggsTestSuite = Boolean.parseBoolean(System.getProperty('test_aggs', "false")) + if (!runCompleteAggsTestSuite) { + filter { + excludeTestsMatching "org.opensearch.neuralsearch.query.aggregation.*IT" + } + } } tasks.named("check").configure { dependsOn(integTest) } diff --git a/src/test/java/org/opensearch/neuralsearch/query/aggregation/BaseAggregationsWithHybridQueryIT.java b/src/test/java/org/opensearch/neuralsearch/query/aggregation/BaseAggregationsWithHybridQueryIT.java new file mode 100644 index 000000000..44d8438af --- /dev/null +++ b/src/test/java/org/opensearch/neuralsearch/query/aggregation/BaseAggregationsWithHybridQueryIT.java @@ -0,0 +1,240 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.neuralsearch.query.aggregation; + +import lombok.SneakyThrows; +import org.junit.Before; +import org.opensearch.neuralsearch.BaseNeuralSearchIT; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.IntStream; + +import static org.opensearch.neuralsearch.TestUtils.RELATION_EQUAL_TO; +import static org.opensearch.neuralsearch.util.AggregationsTestUtils.getNestedHits; +import static org.opensearch.neuralsearch.util.AggregationsTestUtils.getTotalHits; + +public class BaseAggregationsWithHybridQueryIT extends BaseNeuralSearchIT { + protected static final String TEST_DOC_TEXT1 = "Hello world"; + protected static final String TEST_DOC_TEXT2 = "Hi to this place"; + protected static final String TEST_DOC_TEXT3 = "We would like to welcome everyone"; + protected static final String TEST_DOC_TEXT4 = "Hello, I'm glad to you see you pal"; + protected static final String TEST_DOC_TEXT5 = "People keep telling me orange but I still prefer pink"; + protected static final String TEST_DOC_TEXT6 = "She traveled because it cost the same as therapy and was a lot more enjoyable"; + protected static final String TEST_TEXT_FIELD_NAME_1 = "test-text-field-1"; + protected static final String TEST_QUERY_TEXT3 = "hello"; + protected static final String TEST_QUERY_TEXT4 = "cost"; + protected static final String TEST_QUERY_TEXT5 = "welcome"; + protected static final String TEST_NESTED_TYPE_FIELD_NAME_1 = "user"; + protected static final String NESTED_FIELD_1 = "firstname"; + protected static final String NESTED_FIELD_2 = "lastname"; + protected static final String NESTED_FIELD_1_VALUE_1 = "john"; + protected static final String NESTED_FIELD_2_VALUE_1 = "black"; + protected static final String NESTED_FIELD_1_VALUE_2 = "frodo"; + protected static final String NESTED_FIELD_2_VALUE_2 = "baggins"; + protected static final String NESTED_FIELD_1_VALUE_3 = "mohammed"; + protected static final String NESTED_FIELD_2_VALUE_3 = "ezab"; + protected static final String NESTED_FIELD_1_VALUE_4 = "sun"; + protected static final String NESTED_FIELD_2_VALUE_4 = "wukong"; + protected static final String NESTED_FIELD_1_VALUE_5 = "vasilisa"; + protected static final String NESTED_FIELD_2_VALUE_5 = "the wise"; + protected static final String INTEGER_FIELD_1 = "doc_index"; + protected static final int INTEGER_FIELD_1_VALUE = 1234; + protected static final int INTEGER_FIELD_2_VALUE = 2345; + protected static final int INTEGER_FIELD_3_VALUE = 3456; + protected static final int INTEGER_FIELD_4_VALUE = 4567; + protected static final String KEYWORD_FIELD_1 = "doc_keyword"; + protected static final String KEYWORD_FIELD_1_VALUE = "workable"; + protected static final String KEYWORD_FIELD_2_VALUE = "angry"; + protected static final String KEYWORD_FIELD_3_VALUE = "likeable"; + protected static final String KEYWORD_FIELD_4_VALUE = "entire"; + protected static final String DATE_FIELD_1 = "doc_date"; + protected static final String DATE_FIELD_1_VALUE = "01/03/1995"; + private static final String DATE_FIELD_2_VALUE = "05/02/2015"; + protected static final String DATE_FIELD_3_VALUE = "07/23/2007"; + protected static final String DATE_FIELD_4_VALUE = "08/21/2012"; + protected static final String INTEGER_FIELD_PRICE = "doc_price"; + protected static final int INTEGER_FIELD_PRICE_1_VALUE = 130; + protected static final int INTEGER_FIELD_PRICE_2_VALUE = 100; + protected static final int INTEGER_FIELD_PRICE_3_VALUE = 200; + protected static final int INTEGER_FIELD_PRICE_4_VALUE = 25; + protected static final int INTEGER_FIELD_PRICE_5_VALUE = 30; + protected static final int INTEGER_FIELD_PRICE_6_VALUE = 350; + protected static final String BUCKET_AGG_DOC_COUNT_FIELD = "doc_count"; + protected static final String BUCKETS_AGGREGATION_NAME_1 = "date_buckets_1"; + protected static final String BUCKETS_AGGREGATION_NAME_2 = "date_buckets_2"; + protected static final String BUCKETS_AGGREGATION_NAME_3 = "date_buckets_3"; + protected static final String BUCKETS_AGGREGATION_NAME_4 = "date_buckets_4"; + protected static final String KEY = "key"; + protected static final String BUCKET_AGG_KEY_AS_STRING = "key_as_string"; + protected static final String SUM_AGGREGATION_NAME = "sum_aggs"; + protected static final String SUM_AGGREGATION_NAME_2 = "sum_aggs_2"; + protected static final String AVG_AGGREGATION_NAME = "avg_field"; + protected static final String GENERIC_AGGREGATION_NAME = "my_aggregation"; + protected static final String DATE_AGGREGATION_NAME = "date_aggregation"; + protected static final String CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH = "search.concurrent_segment_search.enabled"; + + @Before + public void setUp() throws Exception { + super.setUp(); + updateClusterSettings(); + } + + @Override + public boolean isUpdateClusterSettings() { + return false; + } + + @Override + protected boolean preserveClusterUponCompletion() { + return true; + } + + protected void prepareResources(String indexName, String pipelineName) { + initializeIndexIfNotExist(indexName); + createSearchPipelineWithResultsPostProcessor(pipelineName); + } + + @SneakyThrows + protected void initializeIndexIfNotExist(String indexName) { + if (!indexExists(indexName)) { + createIndexWithConfiguration( + indexName, + buildIndexConfiguration( + List.of(), + List.of(TEST_NESTED_TYPE_FIELD_NAME_1, NESTED_FIELD_1, NESTED_FIELD_2), + List.of(INTEGER_FIELD_1), + List.of(KEYWORD_FIELD_1), + List.of(DATE_FIELD_1), + 3 + ), + "" + ); + + addKnnDoc( + indexName, + "1", + List.of(), + List.of(), + Collections.singletonList(TEST_TEXT_FIELD_NAME_1), + Collections.singletonList(TEST_DOC_TEXT1), + List.of(TEST_NESTED_TYPE_FIELD_NAME_1), + List.of(Map.of(NESTED_FIELD_1, NESTED_FIELD_1_VALUE_1, NESTED_FIELD_2, NESTED_FIELD_2_VALUE_1)), + List.of(INTEGER_FIELD_1, INTEGER_FIELD_PRICE), + List.of(INTEGER_FIELD_1_VALUE, INTEGER_FIELD_PRICE_1_VALUE), + List.of(KEYWORD_FIELD_1), + List.of(KEYWORD_FIELD_1_VALUE), + List.of(DATE_FIELD_1), + List.of(DATE_FIELD_1_VALUE) + ); + addKnnDoc( + indexName, + "2", + List.of(), + List.of(), + Collections.singletonList(TEST_TEXT_FIELD_NAME_1), + Collections.singletonList(TEST_DOC_TEXT3), + List.of(TEST_NESTED_TYPE_FIELD_NAME_1), + List.of(Map.of(NESTED_FIELD_1, NESTED_FIELD_1_VALUE_2, NESTED_FIELD_2, NESTED_FIELD_2_VALUE_2)), + List.of(INTEGER_FIELD_1, INTEGER_FIELD_PRICE), + List.of(INTEGER_FIELD_2_VALUE, INTEGER_FIELD_PRICE_2_VALUE), + List.of(), + List.of(), + List.of(DATE_FIELD_1), + List.of(DATE_FIELD_2_VALUE) + ); + addKnnDoc( + indexName, + "3", + List.of(), + List.of(), + Collections.singletonList(TEST_TEXT_FIELD_NAME_1), + Collections.singletonList(TEST_DOC_TEXT2), + List.of(TEST_NESTED_TYPE_FIELD_NAME_1), + List.of(Map.of(NESTED_FIELD_1, NESTED_FIELD_1_VALUE_3, NESTED_FIELD_2, NESTED_FIELD_2_VALUE_3)), + List.of(INTEGER_FIELD_PRICE), + List.of(INTEGER_FIELD_PRICE_3_VALUE), + List.of(KEYWORD_FIELD_1), + List.of(KEYWORD_FIELD_2_VALUE), + List.of(DATE_FIELD_1), + List.of(DATE_FIELD_3_VALUE) + ); + addKnnDoc( + indexName, + "4", + List.of(), + List.of(), + Collections.singletonList(TEST_TEXT_FIELD_NAME_1), + Collections.singletonList(TEST_DOC_TEXT4), + List.of(TEST_NESTED_TYPE_FIELD_NAME_1), + List.of(Map.of(NESTED_FIELD_1, NESTED_FIELD_1_VALUE_4, NESTED_FIELD_2, NESTED_FIELD_2_VALUE_4)), + List.of(INTEGER_FIELD_1, INTEGER_FIELD_PRICE), + List.of(INTEGER_FIELD_3_VALUE, INTEGER_FIELD_PRICE_4_VALUE), + List.of(KEYWORD_FIELD_1), + List.of(KEYWORD_FIELD_3_VALUE), + List.of(DATE_FIELD_1), + List.of(DATE_FIELD_2_VALUE) + ); + addKnnDoc( + indexName, + "5", + List.of(), + List.of(), + Collections.singletonList(TEST_TEXT_FIELD_NAME_1), + Collections.singletonList(TEST_DOC_TEXT5), + List.of(), + List.of(), + List.of(INTEGER_FIELD_1, INTEGER_FIELD_PRICE), + List.of(INTEGER_FIELD_3_VALUE, INTEGER_FIELD_PRICE_5_VALUE), + List.of(KEYWORD_FIELD_1), + List.of(KEYWORD_FIELD_4_VALUE), + List.of(DATE_FIELD_1), + List.of(DATE_FIELD_4_VALUE) + ); + addKnnDoc( + indexName, + "6", + List.of(), + List.of(), + Collections.singletonList(TEST_TEXT_FIELD_NAME_1), + Collections.singletonList(TEST_DOC_TEXT6), + List.of(TEST_NESTED_TYPE_FIELD_NAME_1), + List.of(Map.of(NESTED_FIELD_1, NESTED_FIELD_1_VALUE_5, NESTED_FIELD_2, NESTED_FIELD_2_VALUE_5)), + List.of(INTEGER_FIELD_1, INTEGER_FIELD_PRICE), + List.of(INTEGER_FIELD_4_VALUE, INTEGER_FIELD_PRICE_6_VALUE), + List.of(KEYWORD_FIELD_1), + List.of(KEYWORD_FIELD_4_VALUE), + List.of(DATE_FIELD_1), + List.of(DATE_FIELD_4_VALUE) + ); + } + } + + protected void assertHitResultsFromQuery(int expected, Map searchResponseAsMap) { + assertEquals(expected, getHitCount(searchResponseAsMap)); + + List> hits1NestedList = getNestedHits(searchResponseAsMap); + List ids = new ArrayList<>(); + List scores = new ArrayList<>(); + for (Map oneHit : hits1NestedList) { + ids.add((String) oneHit.get("_id")); + scores.add((Double) oneHit.get("_score")); + } + + // verify that scores are in desc order + assertTrue(IntStream.range(0, scores.size() - 1).noneMatch(idx -> scores.get(idx) < scores.get(idx + 1))); + // verify that all ids are unique + assertEquals(Set.copyOf(ids).size(), ids.size()); + + Map total = getTotalHits(searchResponseAsMap); + assertNotNull(total.get("value")); + assertEquals(expected, total.get("value")); + assertNotNull(total.get("relation")); + assertEquals(RELATION_EQUAL_TO, total.get("relation")); + } +} diff --git a/src/test/java/org/opensearch/neuralsearch/query/aggregation/BucketAggregationsWithHybridQueryIT.java b/src/test/java/org/opensearch/neuralsearch/query/aggregation/BucketAggregationsWithHybridQueryIT.java new file mode 100644 index 000000000..345d805a8 --- /dev/null +++ b/src/test/java/org/opensearch/neuralsearch/query/aggregation/BucketAggregationsWithHybridQueryIT.java @@ -0,0 +1,815 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.neuralsearch.query.aggregation; + +import lombok.SneakyThrows; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.index.query.TermQueryBuilder; +import org.opensearch.neuralsearch.query.HybridQueryBuilder; +import org.opensearch.script.Script; +import org.opensearch.search.aggregations.AggregationBuilder; +import org.opensearch.search.aggregations.AggregationBuilders; +import org.opensearch.search.aggregations.AggregatorFactories; +import org.opensearch.search.aggregations.PipelineAggregatorBuilders; +import org.opensearch.search.aggregations.bucket.histogram.DateHistogramInterval; +import org.opensearch.search.aggregations.pipeline.AvgBucketPipelineAggregationBuilder; +import org.opensearch.search.aggregations.pipeline.BucketMetricsPipelineAggregationBuilder; +import org.opensearch.search.aggregations.pipeline.MaxBucketPipelineAggregationBuilder; +import org.opensearch.search.aggregations.pipeline.MinBucketPipelineAggregationBuilder; +import org.opensearch.search.aggregations.pipeline.StatsBucketPipelineAggregationBuilder; +import org.opensearch.search.aggregations.pipeline.SumBucketPipelineAggregationBuilder; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import static org.opensearch.neuralsearch.TestUtils.DELTA_FOR_SCORE_ASSERTION; +import static org.opensearch.neuralsearch.util.AggregationsTestUtils.getAggregationBuckets; +import static org.opensearch.neuralsearch.util.AggregationsTestUtils.getAggregationValue; +import static org.opensearch.neuralsearch.util.AggregationsTestUtils.getAggregationValues; +import static org.opensearch.neuralsearch.util.AggregationsTestUtils.getAggregations; + +/** + * Integration tests for bucket type aggregations when they are bundled with hybrid query + * Below is list of aggregations that are present in this test: + * - Adjacency matrix + * - Diversified sampler + * - Date histogram + * - Nested + * - Filter + * - Global + * - Sampler + * - Histogram + * - Significant terms + * - Terms + * + * Following aggs are tested by other integ tests: + * - Date range + * + * Below aggregations are not part of any test: + * - Filters + * - Geodistance + * - Geohash grid + * - Geohex grid + * - Geotile grid + * - IP range + * - Missing + * - Multi-terms + * - Range + * - Reverse nested + * - Significant text + */ +public class BucketAggregationsWithHybridQueryIT extends BaseAggregationsWithHybridQueryIT { + private static final String TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS = "test-aggs-bucket-multi-doc-index-multiple-shards"; + private static final String SEARCH_PIPELINE = "search-pipeline-bucket-aggs"; + + @SneakyThrows + public void testBucketAndNestedAggs_whenAdjacencyMatrix_thenSuccessful() { + updateClusterSettings(CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH, false); + testAdjacencyMatrixAggs(); + } + + @SneakyThrows + public void testWithConcurrentSegmentSearch_whenAdjacencyMatrix_thenSuccessful() { + updateClusterSettings(CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH, true); + testAdjacencyMatrixAggs(); + } + + @SneakyThrows + public void testBucketAndNestedAggs_whenDiversifiedSampler_thenSuccessful() { + updateClusterSettings(CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH, false); + testDiversifiedSampler(); + } + + @SneakyThrows + public void testWithConcurrentSegmentSearch_whenDiversifiedSampler_thenFail() { + updateClusterSettings(CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH, true); + + testDiversifiedSampler(); + } + + @SneakyThrows + public void testBucketAndNestedAggs_whenAvgNestedIntoFilter_thenSuccessful() { + updateClusterSettings(CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH, false); + testAvgNestedIntoFilter(); + } + + @SneakyThrows + public void testWithConcurrentSegmentSearch_whenAvgNestedIntoFilter_thenSuccessful() { + updateClusterSettings(CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH, true); + testAvgNestedIntoFilter(); + } + + @SneakyThrows + public void testBucketAndNestedAggs_whenSumNestedIntoFilters_thenSuccessful() { + updateClusterSettings(CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH, false); + testSumNestedIntoFilters(); + } + + @SneakyThrows + public void testWithConcurrentSegmentSearch_whenSumNestedIntoFilters_thenSuccessful() { + updateClusterSettings(CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH, true); + testSumNestedIntoFilters(); + } + + @SneakyThrows + public void testBucketAggs_whenGlobalAggUsedWithQuery_thenSuccessful() { + updateClusterSettings(CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH, false); + testGlobalAggs(); + } + + @SneakyThrows + public void testWithConcurrentSegmentSearch_whenGlobalAggUsedWithQuery_thenSuccessful() { + updateClusterSettings(CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH, true); + testGlobalAggs(); + } + + @SneakyThrows + public void testBucketAggs_whenHistogramAgg_thenSuccessful() { + updateClusterSettings(CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH, false); + testHistogramAggs(); + } + + @SneakyThrows + public void testWithConcurrentSegmentSearch_whenHistogramAgg_thenSuccessful() { + updateClusterSettings(CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH, true); + testHistogramAggs(); + } + + @SneakyThrows + public void testBucketAggs_whenNestedAgg_thenSuccessful() { + updateClusterSettings(CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH, false); + testNestedAggs(); + } + + @SneakyThrows + public void testWithConcurrentSegmentSearch_whenNestedAgg_thenSuccessful() { + updateClusterSettings(CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH, true); + testNestedAggs(); + } + + @SneakyThrows + public void testBucketAggs_whenSamplerAgg_thenSuccessful() { + updateClusterSettings(CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH, false); + testSampler(); + } + + @SneakyThrows + public void testWithConcurrentSegmentSearch_whenSamplerAgg_thenFail() { + updateClusterSettings(CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH, true); + + testSampler(); + } + + @SneakyThrows + public void testPipelineSiblingAggs_whenDateBucketedSumsPipelinedToBucketMinMaxSumAvgAggs_thenSuccessful() { + updateClusterSettings(CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH, false); + testDateBucketedSumsPipelinedToBucketMinMaxSumAvgAggs(); + } + + @SneakyThrows + public void testWithConcurrentSegmentSearch_whenDateBucketedSumsPipelinedToBucketMinMaxSumAvgAggs_thenSuccessful() { + updateClusterSettings(CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH, true); + testDateBucketedSumsPipelinedToBucketMinMaxSumAvgAggs(); + } + + @SneakyThrows + public void testPipelineSiblingAggs_whenDateBucketedSumsPipelinedToBucketStatsAggs_thenSuccessful() { + updateClusterSettings(CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH, false); + testDateBucketedSumsPipelinedToBucketStatsAggs(); + } + + @SneakyThrows + public void testWithConcurrentSegmentSearch_whenDateBucketedSumsPipelinedToBucketStatsAggs_thenSuccessful() { + updateClusterSettings(CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH, true); + testDateBucketedSumsPipelinedToBucketStatsAggs(); + } + + @SneakyThrows + public void testPipelineSiblingAggs_whenDateBucketedSumsPipelinedToBucketScriptAggs_thenSuccessful() { + updateClusterSettings(CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH, false); + testDateBucketedSumsPipelinedToBucketScriptedAggs(); + } + + @SneakyThrows + public void testWithConcurrentSegmentSearch_whenDateBucketedSumsPipelinedToBucketScriptedAggs_thenSuccessful() { + updateClusterSettings(CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH, true); + testDateBucketedSumsPipelinedToBucketScriptedAggs(); + } + + @SneakyThrows + public void testPipelineParentAggs_whenDateBucketedSumsPipelinedToBucketScriptedAggs_thenSuccessful() { + updateClusterSettings(CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH, false); + testDateBucketedSumsPipelinedToBucketScriptedAggs(); + } + + @SneakyThrows + public void testMetricAggs_whenTermsAggs_thenSuccessful() { + updateClusterSettings(CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH, false); + testTermsAggs(); + } + + @SneakyThrows + public void testWithConcurrentSegmentSearch_whenTermsAggs_thenSuccessful() { + updateClusterSettings(CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH, true); + testTermsAggs(); + } + + @SneakyThrows + public void testMetricAggs_whenSignificantTermsAggs_thenSuccessful() { + updateClusterSettings(CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH, false); + testSignificantTermsAggs(); + } + + @SneakyThrows + public void testWithConcurrentSegmentSearch_whenSignificantTermsAggs_thenSuccessful() { + updateClusterSettings(CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH, true); + testSignificantTermsAggs(); + } + + private void testAvgNestedIntoFilter() throws IOException { + try { + prepareResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, SEARCH_PIPELINE); + + AggregationBuilder aggsBuilder = AggregationBuilders.filter( + GENERIC_AGGREGATION_NAME, + QueryBuilders.rangeQuery(INTEGER_FIELD_1).lte(3000) + ).subAggregation(AggregationBuilders.avg(AVG_AGGREGATION_NAME).field(INTEGER_FIELD_1)); + Map searchResponseAsMap = executeQueryAndGetAggsResults( + aggsBuilder, + TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS + ); + + Map aggregations = getAggregations(searchResponseAsMap); + assertNotNull(aggregations); + assertTrue(aggregations.containsKey(GENERIC_AGGREGATION_NAME)); + double avgValue = getAggregationValue(getAggregationValues(aggregations, GENERIC_AGGREGATION_NAME), AVG_AGGREGATION_NAME); + assertEquals(1789.5, avgValue, DELTA_FOR_SCORE_ASSERTION); + } finally { + wipeOfTestResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, null, null, SEARCH_PIPELINE); + } + } + + private void testSumNestedIntoFilters() throws IOException { + try { + prepareResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, SEARCH_PIPELINE); + + AggregationBuilder aggsBuilder = AggregationBuilders.filters( + GENERIC_AGGREGATION_NAME, + QueryBuilders.rangeQuery(INTEGER_FIELD_1).lte(3000), + QueryBuilders.termQuery(KEYWORD_FIELD_1, KEYWORD_FIELD_1_VALUE) + ).otherBucket(true).subAggregation(AggregationBuilders.sum(SUM_AGGREGATION_NAME).field(INTEGER_FIELD_1)); + Map searchResponseAsMap = executeQueryAndGetAggsResults( + aggsBuilder, + TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS + ); + + Map aggregations = getAggregations(searchResponseAsMap); + assertNotNull(aggregations); + + List> buckets = getAggregationBuckets(aggregations, GENERIC_AGGREGATION_NAME); + assertNotNull(buckets); + assertEquals(3, buckets.size()); + + Map firstBucket = buckets.get(0); + assertEquals(2, firstBucket.size()); + assertEquals(2, firstBucket.get(BUCKET_AGG_DOC_COUNT_FIELD)); + assertEquals(3579.0, getAggregationValue(firstBucket, SUM_AGGREGATION_NAME), DELTA_FOR_SCORE_ASSERTION); + + Map secondBucket = buckets.get(1); + assertEquals(2, secondBucket.size()); + assertEquals(1, secondBucket.get(BUCKET_AGG_DOC_COUNT_FIELD)); + assertEquals(1234.0, getAggregationValue(secondBucket, SUM_AGGREGATION_NAME), DELTA_FOR_SCORE_ASSERTION); + + Map thirdBucket = buckets.get(2); + assertEquals(2, thirdBucket.size()); + assertEquals(1, thirdBucket.get(BUCKET_AGG_DOC_COUNT_FIELD)); + assertEquals(3456.0, getAggregationValue(thirdBucket, SUM_AGGREGATION_NAME), DELTA_FOR_SCORE_ASSERTION); + } finally { + wipeOfTestResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, null, null, SEARCH_PIPELINE); + } + } + + private void testGlobalAggs() throws IOException { + try { + prepareResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, SEARCH_PIPELINE); + + TermQueryBuilder termQueryBuilder1 = QueryBuilders.termQuery(TEST_TEXT_FIELD_NAME_1, TEST_QUERY_TEXT3); + TermQueryBuilder termQueryBuilder2 = QueryBuilders.termQuery(TEST_TEXT_FIELD_NAME_1, TEST_QUERY_TEXT5); + + HybridQueryBuilder hybridQueryBuilderNeuralThenTerm = new HybridQueryBuilder(); + hybridQueryBuilderNeuralThenTerm.add(termQueryBuilder1); + hybridQueryBuilderNeuralThenTerm.add(termQueryBuilder2); + + AggregationBuilder aggsBuilder = AggregationBuilders.global(GENERIC_AGGREGATION_NAME) + .subAggregation(AggregationBuilders.sum(AVG_AGGREGATION_NAME).field(INTEGER_FIELD_1)); + + Map searchResponseAsMap = executeQueryAndGetAggsResults( + List.of(aggsBuilder), + hybridQueryBuilderNeuralThenTerm, + TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS + ); + + Map aggregations = getAggregations(searchResponseAsMap); + assertNotNull(aggregations); + assertTrue(aggregations.containsKey(GENERIC_AGGREGATION_NAME)); + double avgValue = getAggregationValue(getAggregationValues(aggregations, GENERIC_AGGREGATION_NAME), AVG_AGGREGATION_NAME); + assertEquals(15058.0, avgValue, DELTA_FOR_SCORE_ASSERTION); + } finally { + wipeOfTestResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, null, null, SEARCH_PIPELINE); + } + } + + private void testHistogramAggs() throws IOException { + try { + prepareResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, SEARCH_PIPELINE); + + AggregationBuilder aggsBuilder = AggregationBuilders.histogram(GENERIC_AGGREGATION_NAME) + .field(INTEGER_FIELD_PRICE) + .interval(100); + + Map searchResponseAsMap = executeQueryAndGetAggsResults( + aggsBuilder, + TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS + ); + + Map aggregations = getAggregations(searchResponseAsMap); + assertNotNull(aggregations); + + List> buckets = getAggregationBuckets(aggregations, GENERIC_AGGREGATION_NAME); + assertNotNull(buckets); + assertEquals(2, buckets.size()); + + Map firstBucket = buckets.get(0); + assertEquals(2, firstBucket.size()); + assertEquals(1, firstBucket.get(BUCKET_AGG_DOC_COUNT_FIELD)); + assertEquals(0.0, (Double) firstBucket.get(KEY), DELTA_FOR_SCORE_ASSERTION); + + Map secondBucket = buckets.get(1); + assertEquals(2, secondBucket.size()); + assertEquals(2, secondBucket.get(BUCKET_AGG_DOC_COUNT_FIELD)); + assertEquals(100.0, (Double) secondBucket.get(KEY), DELTA_FOR_SCORE_ASSERTION); + } finally { + wipeOfTestResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, null, null, SEARCH_PIPELINE); + } + } + + private void testNestedAggs() throws IOException { + try { + prepareResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, SEARCH_PIPELINE); + + AggregationBuilder aggsBuilder = AggregationBuilders.nested(GENERIC_AGGREGATION_NAME, TEST_NESTED_TYPE_FIELD_NAME_1) + .subAggregation( + AggregationBuilders.terms(BUCKETS_AGGREGATION_NAME_1) + .field(String.join(".", TEST_NESTED_TYPE_FIELD_NAME_1, NESTED_FIELD_1)) + ); + + Map searchResponseAsMap = executeQueryAndGetAggsResults( + aggsBuilder, + TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS + ); + + Map aggregations = getAggregations(searchResponseAsMap); + assertNotNull(aggregations); + + Map nestedAgg = getAggregationValues(aggregations, GENERIC_AGGREGATION_NAME); + assertNotNull(nestedAgg); + + assertEquals(3, nestedAgg.get(BUCKET_AGG_DOC_COUNT_FIELD)); + List> buckets = getAggregationBuckets(nestedAgg, BUCKETS_AGGREGATION_NAME_1); + + assertNotNull(buckets); + assertEquals(3, buckets.size()); + + Map firstBucket = buckets.get(0); + assertEquals(2, firstBucket.size()); + assertEquals(1, firstBucket.get(BUCKET_AGG_DOC_COUNT_FIELD)); + assertEquals(NESTED_FIELD_1_VALUE_2, firstBucket.get(KEY)); + + Map secondBucket = buckets.get(1); + assertEquals(2, secondBucket.size()); + assertEquals(1, secondBucket.get(BUCKET_AGG_DOC_COUNT_FIELD)); + assertEquals(NESTED_FIELD_1_VALUE_1, secondBucket.get(KEY)); + + Map thirdBucket = buckets.get(2); + assertEquals(2, thirdBucket.size()); + assertEquals(1, thirdBucket.get(BUCKET_AGG_DOC_COUNT_FIELD)); + assertEquals(NESTED_FIELD_1_VALUE_4, thirdBucket.get(KEY)); + } finally { + wipeOfTestResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, null, null, SEARCH_PIPELINE); + } + } + + private void testDiversifiedSampler() throws IOException { + try { + prepareResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, SEARCH_PIPELINE); + + AggregationBuilder aggsBuilder = AggregationBuilders.diversifiedSampler(GENERIC_AGGREGATION_NAME) + .field(KEYWORD_FIELD_1) + .shardSize(2) + .subAggregation(AggregationBuilders.terms(BUCKETS_AGGREGATION_NAME_1).field(KEYWORD_FIELD_1)); + + Map searchResponseAsMap = executeQueryAndGetAggsResults( + aggsBuilder, + TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS + ); + + Map aggregations = getAggregations(searchResponseAsMap); + assertNotNull(aggregations); + + Map aggValue = getAggregationValues(aggregations, GENERIC_AGGREGATION_NAME); + assertEquals(2, aggValue.size()); + assertEquals(3, aggValue.get(BUCKET_AGG_DOC_COUNT_FIELD)); + Map nestedAggs = getAggregationValues(aggValue, BUCKETS_AGGREGATION_NAME_1); + assertNotNull(nestedAggs); + assertEquals(0, nestedAggs.get("doc_count_error_upper_bound")); + List> buckets = getAggregationBuckets(aggValue, BUCKETS_AGGREGATION_NAME_1); + assertEquals(2, buckets.size()); + + Map firstBucket = buckets.get(0); + assertEquals(1, firstBucket.get(BUCKET_AGG_DOC_COUNT_FIELD)); + assertEquals("likeable", firstBucket.get(KEY)); + + Map secondBucket = buckets.get(1); + assertEquals(1, secondBucket.get(BUCKET_AGG_DOC_COUNT_FIELD)); + assertEquals("workable", secondBucket.get(KEY)); + } finally { + wipeOfTestResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, null, null, SEARCH_PIPELINE); + } + } + + private void testAdjacencyMatrixAggs() throws IOException { + try { + prepareResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, SEARCH_PIPELINE); + + AggregationBuilder aggsBuilder = AggregationBuilders.adjacencyMatrix( + GENERIC_AGGREGATION_NAME, + Map.of( + "grpA", + QueryBuilders.matchQuery(KEYWORD_FIELD_1, KEYWORD_FIELD_1_VALUE), + "grpB", + QueryBuilders.matchQuery(KEYWORD_FIELD_1, KEYWORD_FIELD_2_VALUE), + "grpC", + QueryBuilders.matchQuery(KEYWORD_FIELD_1, KEYWORD_FIELD_3_VALUE) + ) + ); + Map searchResponseAsMap = executeQueryAndGetAggsResults( + aggsBuilder, + TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS + ); + + Map aggregations = getAggregations(searchResponseAsMap); + assertNotNull(aggregations); + List> buckets = getAggregationBuckets(aggregations, GENERIC_AGGREGATION_NAME); + assertNotNull(buckets); + assertEquals(2, buckets.size()); + Map grpA = buckets.get(0); + assertEquals(1, grpA.get(BUCKET_AGG_DOC_COUNT_FIELD)); + assertEquals("grpA", grpA.get(KEY)); + Map grpC = buckets.get(1); + assertEquals(1, grpC.get(BUCKET_AGG_DOC_COUNT_FIELD)); + assertEquals("grpC", grpC.get(KEY)); + } finally { + wipeOfTestResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, null, null, SEARCH_PIPELINE); + } + } + + private void testDateBucketedSumsPipelinedToBucketMinMaxSumAvgAggs() throws IOException { + try { + prepareResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, SEARCH_PIPELINE); + + AggregationBuilder aggDateHisto = AggregationBuilders.dateHistogram(GENERIC_AGGREGATION_NAME) + .calendarInterval(DateHistogramInterval.YEAR) + .field(DATE_FIELD_1) + .subAggregation(AggregationBuilders.sum(SUM_AGGREGATION_NAME).field(INTEGER_FIELD_1)); + + BucketMetricsPipelineAggregationBuilder aggAvgBucket = PipelineAggregatorBuilders + .avgBucket(BUCKETS_AGGREGATION_NAME_1, GENERIC_AGGREGATION_NAME + ">" + SUM_AGGREGATION_NAME); + + BucketMetricsPipelineAggregationBuilder aggSumBucket = PipelineAggregatorBuilders + .sumBucket(BUCKETS_AGGREGATION_NAME_2, GENERIC_AGGREGATION_NAME + ">" + SUM_AGGREGATION_NAME); + + BucketMetricsPipelineAggregationBuilder aggMinBucket = PipelineAggregatorBuilders + .minBucket(BUCKETS_AGGREGATION_NAME_3, GENERIC_AGGREGATION_NAME + ">" + SUM_AGGREGATION_NAME); + + BucketMetricsPipelineAggregationBuilder aggMaxBucket = PipelineAggregatorBuilders + .maxBucket(BUCKETS_AGGREGATION_NAME_4, GENERIC_AGGREGATION_NAME + ">" + SUM_AGGREGATION_NAME); + + Map searchResponseAsMap = executeQueryAndGetAggsResults( + List.of(aggDateHisto, aggAvgBucket, aggSumBucket, aggMinBucket, aggMaxBucket), + TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS + ); + + assertResultsOfPipelineSumtoDateHistogramAggs(searchResponseAsMap); + } finally { + wipeOfTestResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, null, null, SEARCH_PIPELINE); + } + } + + private void assertResultsOfPipelineSumtoDateHistogramAggs(Map searchResponseAsMap) { + Map aggregations = getAggregations(searchResponseAsMap); + assertNotNull(aggregations); + + double aggValue = getAggregationValue(aggregations, BUCKETS_AGGREGATION_NAME_1); + assertEquals(3517.5, aggValue, DELTA_FOR_SCORE_ASSERTION); + + double sumValue = getAggregationValue(aggregations, BUCKETS_AGGREGATION_NAME_2); + assertEquals(7035.0, sumValue, DELTA_FOR_SCORE_ASSERTION); + + double minValue = getAggregationValue(aggregations, BUCKETS_AGGREGATION_NAME_3); + assertEquals(1234.0, minValue, DELTA_FOR_SCORE_ASSERTION); + + double maxValue = getAggregationValue(aggregations, BUCKETS_AGGREGATION_NAME_4); + assertEquals(5801.0, maxValue, DELTA_FOR_SCORE_ASSERTION); + + List> buckets = getAggregationBuckets(aggregations, GENERIC_AGGREGATION_NAME); + assertNotNull(buckets); + assertEquals(21, buckets.size()); + + // check content of few buckets + Map firstBucket = buckets.get(0); + assertEquals(4, firstBucket.size()); + assertEquals("01/01/1995", firstBucket.get(BUCKET_AGG_KEY_AS_STRING)); + assertEquals(1, firstBucket.get(BUCKET_AGG_DOC_COUNT_FIELD)); + assertEquals(1234.0, getAggregationValue(firstBucket, SUM_AGGREGATION_NAME), DELTA_FOR_SCORE_ASSERTION); + assertTrue(firstBucket.containsKey(KEY)); + + Map secondBucket = buckets.get(1); + assertEquals(4, secondBucket.size()); + assertEquals("01/01/1996", secondBucket.get(BUCKET_AGG_KEY_AS_STRING)); + assertEquals(0, secondBucket.get(BUCKET_AGG_DOC_COUNT_FIELD)); + assertEquals(0.0, getAggregationValue(secondBucket, SUM_AGGREGATION_NAME), DELTA_FOR_SCORE_ASSERTION); + assertTrue(secondBucket.containsKey(KEY)); + + Map lastBucket = buckets.get(buckets.size() - 1); + assertEquals(4, lastBucket.size()); + assertEquals("01/01/2015", lastBucket.get(BUCKET_AGG_KEY_AS_STRING)); + assertEquals(2, lastBucket.get(BUCKET_AGG_DOC_COUNT_FIELD)); + assertEquals(5801.0, getAggregationValue(lastBucket, SUM_AGGREGATION_NAME), DELTA_FOR_SCORE_ASSERTION); + assertTrue(lastBucket.containsKey(KEY)); + } + + private void testDateBucketedSumsPipelinedToBucketStatsAggs() throws IOException { + try { + prepareResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, SEARCH_PIPELINE); + + AggregationBuilder aggDateHisto = AggregationBuilders.dateHistogram(GENERIC_AGGREGATION_NAME) + .calendarInterval(DateHistogramInterval.YEAR) + .field(DATE_FIELD_1) + .subAggregation(AggregationBuilders.sum(SUM_AGGREGATION_NAME).field(INTEGER_FIELD_1)); + + StatsBucketPipelineAggregationBuilder aggStatsBucket = PipelineAggregatorBuilders.statsBucket( + BUCKETS_AGGREGATION_NAME_1, + GENERIC_AGGREGATION_NAME + ">" + SUM_AGGREGATION_NAME + ); + + Map searchResponseAsMap = executeQueryAndGetAggsResults( + List.of(aggDateHisto, aggStatsBucket), + TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS + ); + + Map aggregations = getAggregations(searchResponseAsMap); + assertNotNull(aggregations); + + Map statsAggs = getAggregationValues(aggregations, BUCKETS_AGGREGATION_NAME_1); + + assertNotNull(statsAggs); + + assertEquals(3517.5, (Double) statsAggs.get("avg"), DELTA_FOR_SCORE_ASSERTION); + assertEquals(7035.0, (Double) statsAggs.get("sum"), DELTA_FOR_SCORE_ASSERTION); + assertEquals(1234.0, (Double) statsAggs.get("min"), DELTA_FOR_SCORE_ASSERTION); + assertEquals(5801.0, (Double) statsAggs.get("max"), DELTA_FOR_SCORE_ASSERTION); + assertEquals(2, (int) statsAggs.get("count")); + } finally { + wipeOfTestResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, null, null, SEARCH_PIPELINE); + } + } + + private void testDateBucketedSumsPipelinedToBucketScriptedAggs() throws IOException { + try { + prepareResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, SEARCH_PIPELINE); + + AggregationBuilder aggBuilder = AggregationBuilders.dateHistogram(DATE_AGGREGATION_NAME) + .calendarInterval(DateHistogramInterval.YEAR) + .field(DATE_FIELD_1) + .subAggregations( + new AggregatorFactories.Builder().addAggregator(AggregationBuilders.sum(SUM_AGGREGATION_NAME).field(INTEGER_FIELD_1)) + .addAggregator( + AggregationBuilders.filter( + GENERIC_AGGREGATION_NAME, + QueryBuilders.boolQuery() + .should( + QueryBuilders.boolQuery() + .should(QueryBuilders.termQuery(KEYWORD_FIELD_1, KEYWORD_FIELD_1_VALUE)) + .should(QueryBuilders.termQuery(KEYWORD_FIELD_1, KEYWORD_FIELD_2_VALUE)) + ) + .should(QueryBuilders.boolQuery().mustNot(QueryBuilders.existsQuery(KEYWORD_FIELD_1))) + ).subAggregation(AggregationBuilders.sum(SUM_AGGREGATION_NAME_2).field(INTEGER_FIELD_PRICE)) + ) + .addPipelineAggregator( + PipelineAggregatorBuilders.bucketScript( + BUCKETS_AGGREGATION_NAME_1, + Map.of("docNum", GENERIC_AGGREGATION_NAME + ">" + SUM_AGGREGATION_NAME_2, "totalNum", SUM_AGGREGATION_NAME), + new Script("params.docNum / params.totalNum") + ) + ) + ); + + Map searchResponseAsMap = executeQueryAndGetAggsResults( + aggBuilder, + TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS + ); + + Map aggregations = getAggregations(searchResponseAsMap); + assertNotNull(aggregations); + + List> buckets = getAggregationBuckets(aggregations, DATE_AGGREGATION_NAME); + + assertNotNull(buckets); + assertEquals(21, buckets.size()); + + // check content of few buckets + // first bucket have all the aggs values + Map firstBucket = buckets.get(0); + assertEquals(6, firstBucket.size()); + assertEquals("01/01/1995", firstBucket.get(BUCKET_AGG_KEY_AS_STRING)); + assertEquals(1, firstBucket.get(BUCKET_AGG_DOC_COUNT_FIELD)); + assertEquals(0.1053, getAggregationValue(firstBucket, BUCKETS_AGGREGATION_NAME_1), DELTA_FOR_SCORE_ASSERTION); + assertEquals(1234.0, getAggregationValue(firstBucket, SUM_AGGREGATION_NAME), DELTA_FOR_SCORE_ASSERTION); + assertTrue(firstBucket.containsKey(KEY)); + + Map inBucketAggValues = getAggregationValues(firstBucket, GENERIC_AGGREGATION_NAME); + assertNotNull(inBucketAggValues); + assertEquals(1, inBucketAggValues.get(BUCKET_AGG_DOC_COUNT_FIELD)); + assertEquals(130.0, getAggregationValue(inBucketAggValues, SUM_AGGREGATION_NAME_2), DELTA_FOR_SCORE_ASSERTION); + + // second bucket is empty + Map secondBucket = buckets.get(1); + assertEquals(5, secondBucket.size()); + assertEquals("01/01/1996", secondBucket.get(BUCKET_AGG_KEY_AS_STRING)); + assertEquals(0, secondBucket.get(BUCKET_AGG_DOC_COUNT_FIELD)); + assertFalse(secondBucket.containsKey(BUCKETS_AGGREGATION_NAME_1)); + assertEquals(0.0, getAggregationValue(secondBucket, SUM_AGGREGATION_NAME), DELTA_FOR_SCORE_ASSERTION); + assertTrue(secondBucket.containsKey(KEY)); + + Map inSecondBucketAggValues = getAggregationValues(secondBucket, GENERIC_AGGREGATION_NAME); + assertNotNull(inSecondBucketAggValues); + assertEquals(0, inSecondBucketAggValues.get(BUCKET_AGG_DOC_COUNT_FIELD)); + assertEquals(0.0, getAggregationValue(inSecondBucketAggValues, SUM_AGGREGATION_NAME_2), DELTA_FOR_SCORE_ASSERTION); + + // last bucket has values + Map lastBucket = buckets.get(buckets.size() - 1); + assertEquals(6, lastBucket.size()); + assertEquals("01/01/2015", lastBucket.get(BUCKET_AGG_KEY_AS_STRING)); + assertEquals(2, lastBucket.get(BUCKET_AGG_DOC_COUNT_FIELD)); + assertEquals(0.0172, getAggregationValue(lastBucket, BUCKETS_AGGREGATION_NAME_1), DELTA_FOR_SCORE_ASSERTION); + assertEquals(5801.0, getAggregationValue(lastBucket, SUM_AGGREGATION_NAME), DELTA_FOR_SCORE_ASSERTION); + assertTrue(lastBucket.containsKey(KEY)); + + Map inLastBucketAggValues = getAggregationValues(lastBucket, GENERIC_AGGREGATION_NAME); + assertNotNull(inLastBucketAggValues); + assertEquals(1, inLastBucketAggValues.get(BUCKET_AGG_DOC_COUNT_FIELD)); + assertEquals(100.0, getAggregationValue(inLastBucketAggValues, SUM_AGGREGATION_NAME_2), DELTA_FOR_SCORE_ASSERTION); + } finally { + wipeOfTestResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, null, null, SEARCH_PIPELINE); + } + } + + private void testSampler() throws IOException { + try { + prepareResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, SEARCH_PIPELINE); + + AggregationBuilder aggsBuilder = AggregationBuilders.sampler(GENERIC_AGGREGATION_NAME) + .shardSize(2) + .subAggregation(AggregationBuilders.terms(BUCKETS_AGGREGATION_NAME_1).field(KEYWORD_FIELD_1)); + + Map searchResponseAsMap = executeQueryAndGetAggsResults( + aggsBuilder, + TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS + ); + + Map aggregations = getAggregations(searchResponseAsMap); + assertNotNull(aggregations); + + Map aggValue = getAggregationValues(aggregations, GENERIC_AGGREGATION_NAME); + assertEquals(2, aggValue.size()); + assertEquals(3, aggValue.get(BUCKET_AGG_DOC_COUNT_FIELD)); + Map nestedAggs = getAggregationValues(aggValue, BUCKETS_AGGREGATION_NAME_1); + assertNotNull(nestedAggs); + assertEquals(0, nestedAggs.get("doc_count_error_upper_bound")); + List> buckets = getAggregationBuckets(aggValue, BUCKETS_AGGREGATION_NAME_1); + assertEquals(2, buckets.size()); + + Map firstBucket = buckets.get(0); + assertEquals(1, firstBucket.get(BUCKET_AGG_DOC_COUNT_FIELD)); + assertEquals("likeable", firstBucket.get(KEY)); + + Map secondBucket = buckets.get(1); + assertEquals(1, secondBucket.get(BUCKET_AGG_DOC_COUNT_FIELD)); + assertEquals("workable", secondBucket.get(KEY)); + } finally { + wipeOfTestResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, null, null, SEARCH_PIPELINE); + } + } + + private void testTermsAggs() throws IOException { + try { + prepareResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, SEARCH_PIPELINE); + + AggregationBuilder aggsBuilder = AggregationBuilders.terms(GENERIC_AGGREGATION_NAME).field(KEYWORD_FIELD_1); + Map searchResponseAsMap = executeQueryAndGetAggsResults( + aggsBuilder, + TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS + ); + + Map aggregations = getAggregations(searchResponseAsMap); + assertNotNull(aggregations); + List> buckets = ((Map) getAggregationValues(aggregations, GENERIC_AGGREGATION_NAME)).get( + "buckets" + ); + assertNotNull(buckets); + assertEquals(2, buckets.size()); + Map firstBucket = buckets.get(0); + assertEquals(1, firstBucket.get(BUCKET_AGG_DOC_COUNT_FIELD)); + assertEquals(KEYWORD_FIELD_3_VALUE, firstBucket.get(KEY)); + Map secondBucket = buckets.get(1); + assertEquals(1, secondBucket.get(BUCKET_AGG_DOC_COUNT_FIELD)); + assertEquals(KEYWORD_FIELD_1_VALUE, secondBucket.get(KEY)); + } finally { + wipeOfTestResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, null, null, SEARCH_PIPELINE); + } + } + + private void testSignificantTermsAggs() throws IOException { + try { + prepareResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, SEARCH_PIPELINE); + + AggregationBuilder aggsBuilder = AggregationBuilders.significantTerms(GENERIC_AGGREGATION_NAME).field(KEYWORD_FIELD_1); + Map searchResponseAsMap = executeQueryAndGetAggsResults( + aggsBuilder, + TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS + ); + + Map aggregations = getAggregations(searchResponseAsMap); + assertNotNull(aggregations); + List> buckets = getAggregationBuckets(aggregations, GENERIC_AGGREGATION_NAME); + assertNotNull(buckets); + + Map significantTermsAggregations = getAggregationValues(aggregations, GENERIC_AGGREGATION_NAME); + + assertNotNull(significantTermsAggregations); + assertEquals(3, (int) getAggregationValues(significantTermsAggregations, BUCKET_AGG_DOC_COUNT_FIELD)); + assertEquals(11, (int) getAggregationValues(significantTermsAggregations, "bg_count")); + } finally { + wipeOfTestResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, null, null, SEARCH_PIPELINE); + } + } + + private Map executeQueryAndGetAggsResults(final Object aggsBuilder, String indexName) { + return executeQueryAndGetAggsResults(List.of(aggsBuilder), indexName); + } + + private Map executeQueryAndGetAggsResults( + final List aggsBuilders, + QueryBuilder queryBuilder, + String indexName, + int expectedHits + ) { + initializeIndexIfNotExist(indexName); + + Map searchResponseAsMap = search( + indexName, + queryBuilder, + null, + 10, + Map.of("search_pipeline", SEARCH_PIPELINE), + aggsBuilders + ); + + assertHitResultsFromQuery(expectedHits, searchResponseAsMap); + return searchResponseAsMap; + } + + private Map executeQueryAndGetAggsResults( + final List aggsBuilders, + QueryBuilder queryBuilder, + String indexName + ) { + return executeQueryAndGetAggsResults(aggsBuilders, queryBuilder, indexName, 3); + } + + private Map executeQueryAndGetAggsResults(final List aggsBuilders, String indexName) { + + TermQueryBuilder termQueryBuilder1 = QueryBuilders.termQuery(TEST_TEXT_FIELD_NAME_1, TEST_QUERY_TEXT3); + TermQueryBuilder termQueryBuilder2 = QueryBuilders.termQuery(TEST_TEXT_FIELD_NAME_1, TEST_QUERY_TEXT5); + + HybridQueryBuilder hybridQueryBuilderNeuralThenTerm = new HybridQueryBuilder(); + hybridQueryBuilderNeuralThenTerm.add(termQueryBuilder1); + hybridQueryBuilderNeuralThenTerm.add(termQueryBuilder2); + + return executeQueryAndGetAggsResults(aggsBuilders, hybridQueryBuilderNeuralThenTerm, indexName); + } +} diff --git a/src/test/java/org/opensearch/neuralsearch/query/aggregation/MetricAggregationsWithHybridQueryIT.java b/src/test/java/org/opensearch/neuralsearch/query/aggregation/MetricAggregationsWithHybridQueryIT.java new file mode 100644 index 000000000..b40e94f2a --- /dev/null +++ b/src/test/java/org/opensearch/neuralsearch/query/aggregation/MetricAggregationsWithHybridQueryIT.java @@ -0,0 +1,489 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.neuralsearch.query.aggregation; + +import lombok.SneakyThrows; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.index.query.TermQueryBuilder; +import org.opensearch.neuralsearch.query.HybridQueryBuilder; +import org.opensearch.script.Script; +import org.opensearch.search.aggregations.AggregationBuilder; +import org.opensearch.search.aggregations.AggregationBuilders; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.opensearch.neuralsearch.TestUtils.DELTA_FOR_SCORE_ASSERTION; +import static org.opensearch.neuralsearch.util.AggregationsTestUtils.getAggregationValue; +import static org.opensearch.neuralsearch.util.AggregationsTestUtils.getAggregationValues; +import static org.opensearch.neuralsearch.util.AggregationsTestUtils.getAggregations; +import static org.opensearch.neuralsearch.util.AggregationsTestUtils.getNestedHits; + +/** + * Integration tests for metric type aggregations when they are bundled with hybrid query + * Below is list of metric aggregations that are present in this test: + * - Average + * - Cardinality + * - Extended stats + * - Top hits + * - Percentile ranks + * - Scripted metric + * - Sum + * - Value count + * + * Following metric aggs are tested by other integ tests + * - Maximum + * + * + * Below aggregations are not part of any test + * - Geobounds + * - Matrix stats + * - Minimum + * - Percentile + * - Stats + */ +public class MetricAggregationsWithHybridQueryIT extends BaseAggregationsWithHybridQueryIT { + private static final String TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS = "test-aggs-metric-multi-doc-index-multiple-shards"; + private static final String SEARCH_PIPELINE = "search-pipeline-metric-aggs"; + + /** + * Tests complex query with multiple nested sub-queries: + * { + * "query": { + * "hybrid": { + * "queries": [ + * { + * "term": { + * "text": "word1" + * } + * }, + * { + * "term": { + * "text": "word3" + * } + * } + * ] + * } + * }, + * "aggs": { + * "max_index": { + * "max": { + * "field": "doc_index" + * } + * } + * } + * } + */ + @SneakyThrows + public void testWithConcurrentSegmentSearch_whenAvgAggs_thenSuccessful() { + updateClusterSettings(CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH, true); + testAvgAggs(); + } + + @SneakyThrows + public void testMetricAggs_whenCardinalityAggs_thenSuccessful() { + updateClusterSettings(CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH, false); + testCardinalityAggs(); + } + + @SneakyThrows + public void testWithConcurrentSegmentSearch_whenCardinalityAggs_thenSuccessful() { + updateClusterSettings(CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH, true); + testCardinalityAggs(); + } + + @SneakyThrows + public void testMetricAggs_whenExtendedStatsAggs_thenSuccessful() { + updateClusterSettings(CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH, false); + testExtendedStatsAggs(); + } + + @SneakyThrows + public void testWithConcurrentSegmentSearch_whenExtendedStatsAggs_thenSuccessful() { + updateClusterSettings(CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH, true); + testExtendedStatsAggs(); + } + + @SneakyThrows + public void testMetricAggs_whenTopHitsAggs_thenSuccessful() { + updateClusterSettings(CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH, false); + testTopHitsAggs(); + } + + @SneakyThrows + public void testWithConcurrentSegmentSearch_whenTopHitsAggs_thenSuccessful() { + updateClusterSettings(CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH, true); + testTopHitsAggs(); + } + + @SneakyThrows + public void testMetricAggs_whenPercentileRank_thenSuccessful() { + updateClusterSettings(CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH, false); + testPercentileRankAggs(); + } + + @SneakyThrows + public void testWithConcurrentSegmentSearch_whenPercentileRank_thenSuccessful() { + updateClusterSettings(CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH, true); + testPercentileRankAggs(); + } + + @SneakyThrows + public void testMetricAggs_whenPercentile_thenSuccessful() { + updateClusterSettings(CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH, false); + testPercentileAggs(); + } + + @SneakyThrows + public void testWithConcurrentSegmentSearch_whenPercentile_thenSuccessful() { + updateClusterSettings(CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH, true); + testPercentileAggs(); + } + + @SneakyThrows + public void testMetricAggs_whenScriptedMetrics_thenSuccessful() { + updateClusterSettings(CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH, false); + testScriptedMetricsAggs(); + } + + @SneakyThrows + public void testWithConcurrentSegmentSearch_whenScriptedMetrics_thenSuccessful() { + updateClusterSettings(CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH, true); + testScriptedMetricsAggs(); + } + + @SneakyThrows + public void testMetricAggs_whenSumAgg_thenSuccessful() { + updateClusterSettings(CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH, false); + testSumAggs(); + } + + @SneakyThrows + public void testWithConcurrentSegmentSearch_whenSumAgg_thenSuccessful() { + updateClusterSettings(CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH, true); + testSumAggs(); + } + + @SneakyThrows + public void testMetricAggs_whenValueCount_thenSuccessful() { + updateClusterSettings(CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH, false); + testValueCountAggs(); + } + + @SneakyThrows + public void testWithConcurrentSegmentSearch_whenValueCount_thenSuccessful() { + updateClusterSettings(CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH, true); + testValueCountAggs(); + } + + private void testAvgAggs() throws IOException { + try { + prepareResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, SEARCH_PIPELINE); + + AggregationBuilder aggsBuilder = AggregationBuilders.avg(AVG_AGGREGATION_NAME).field(INTEGER_FIELD_1); + Map searchResponseAsMap = executeQueryAndGetAggsResults( + aggsBuilder, + TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS + ); + + Map aggregations = getAggregations(searchResponseAsMap); + assertNotNull(aggregations); + assertTrue(aggregations.containsKey(AVG_AGGREGATION_NAME)); + double maxAggsValue = getAggregationValue(aggregations, AVG_AGGREGATION_NAME); + assertEquals(maxAggsValue, 2345.0, DELTA_FOR_SCORE_ASSERTION); + } finally { + wipeOfTestResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, null, null, SEARCH_PIPELINE); + } + } + + private void testCardinalityAggs() throws IOException { + try { + prepareResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, SEARCH_PIPELINE); + AggregationBuilder aggsBuilder = AggregationBuilders.cardinality(GENERIC_AGGREGATION_NAME).field(INTEGER_FIELD_1); + Map searchResponseAsMap = executeQueryAndGetAggsResults( + aggsBuilder, + TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS + ); + + Map aggregations = getAggregations(searchResponseAsMap); + assertNotNull(aggregations); + assertTrue(aggregations.containsKey(GENERIC_AGGREGATION_NAME)); + int aggsValue = getAggregationValue(aggregations, GENERIC_AGGREGATION_NAME); + assertEquals(aggsValue, 3); + } finally { + wipeOfTestResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, null, null, SEARCH_PIPELINE); + } + } + + private void testExtendedStatsAggs() throws IOException { + try { + prepareResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, SEARCH_PIPELINE); + AggregationBuilder aggsBuilder = AggregationBuilders.extendedStats(GENERIC_AGGREGATION_NAME).field(INTEGER_FIELD_1); + Map searchResponseAsMap = executeQueryAndGetAggsResults( + aggsBuilder, + TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS + ); + + Map aggregations = getAggregations(searchResponseAsMap); + assertNotNull(aggregations); + assertTrue(aggregations.containsKey(GENERIC_AGGREGATION_NAME)); + Map extendedStatsValues = getAggregationValues(aggregations, GENERIC_AGGREGATION_NAME); + assertNotNull(extendedStatsValues); + + assertEquals((double) extendedStatsValues.get("max"), 3456.0, DELTA_FOR_SCORE_ASSERTION); + assertEquals((int) extendedStatsValues.get("count"), 3); + assertEquals((double) extendedStatsValues.get("sum"), 7035.0, DELTA_FOR_SCORE_ASSERTION); + assertEquals((double) extendedStatsValues.get("avg"), 2345.0, DELTA_FOR_SCORE_ASSERTION); + assertEquals((double) extendedStatsValues.get("variance"), 822880.666, DELTA_FOR_SCORE_ASSERTION); + assertEquals((double) extendedStatsValues.get("std_deviation"), 907.127, DELTA_FOR_SCORE_ASSERTION); + } finally { + wipeOfTestResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, null, null, SEARCH_PIPELINE); + } + } + + private void testTopHitsAggs() throws IOException { + try { + prepareResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, SEARCH_PIPELINE); + AggregationBuilder aggsBuilder = AggregationBuilders.topHits(GENERIC_AGGREGATION_NAME).size(4); + Map searchResponseAsMap = executeQueryAndGetAggsResults( + aggsBuilder, + TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS + ); + + Map aggregations = getAggregations(searchResponseAsMap); + assertNotNull(aggregations); + assertTrue(aggregations.containsKey(GENERIC_AGGREGATION_NAME)); + Map aggsValues = getAggregationValues(aggregations, GENERIC_AGGREGATION_NAME); + assertNotNull(aggsValues); + assertHitResultsFromQuery(3, aggsValues); + } finally { + wipeOfTestResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, null, null, SEARCH_PIPELINE); + } + } + + private void testScriptedMetricsAggs() throws IOException { + try { + prepareResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, SEARCH_PIPELINE); + // compute sum of all int fields that are not blank + AggregationBuilder aggsBuilder = AggregationBuilders.scriptedMetric(GENERIC_AGGREGATION_NAME) + .initScript(new Script("state.price = []")) + .mapScript( + new Script("state.price.add(doc[\"" + INTEGER_FIELD_1 + "\"].size() == 0 ? 0 : doc." + INTEGER_FIELD_1 + ".value)") + ) + .combineScript(new Script("state.price.stream().mapToInt(Integer::intValue).sum()")) + .reduceScript(new Script("states.stream().mapToInt(Integer::intValue).sum()")); + Map searchResponseAsMap = executeQueryAndGetAggsResults( + aggsBuilder, + TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS + ); + + Map aggregations = getAggregations(searchResponseAsMap); + assertNotNull(aggregations); + assertTrue(aggregations.containsKey(GENERIC_AGGREGATION_NAME)); + int aggsValue = getAggregationValue(aggregations, GENERIC_AGGREGATION_NAME); + assertEquals(7035, aggsValue); + } finally { + wipeOfTestResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, null, null, SEARCH_PIPELINE); + } + } + + private void testPercentileAggs() throws IOException { + try { + prepareResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, SEARCH_PIPELINE); + AggregationBuilder aggsBuilder = AggregationBuilders.percentiles(GENERIC_AGGREGATION_NAME).field(INTEGER_FIELD_1); + Map searchResponseAsMap = executeQueryAndGetAggsResults( + aggsBuilder, + TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS + ); + + assertHitResultsFromQuery(3, searchResponseAsMap); + + Map aggregations = getAggregations(searchResponseAsMap); + assertNotNull(aggregations); + assertTrue(aggregations.containsKey(GENERIC_AGGREGATION_NAME)); + Map> aggsValues = getAggregationValues(aggregations, GENERIC_AGGREGATION_NAME); + assertNotNull(aggsValues); + + Map values = aggsValues.get("values"); + assertNotNull(values); + assertEquals(7, values.size()); + assertEquals(1234.0, values.get("1.0"), DELTA_FOR_SCORE_ASSERTION); + assertEquals(1234.0, values.get("5.0"), DELTA_FOR_SCORE_ASSERTION); + assertEquals(1234.0, values.get("25.0"), DELTA_FOR_SCORE_ASSERTION); + assertEquals(2345.0, values.get("50.0"), DELTA_FOR_SCORE_ASSERTION); + assertEquals(3456.0, values.get("75.0"), DELTA_FOR_SCORE_ASSERTION); + assertEquals(3456.0, values.get("95.0"), DELTA_FOR_SCORE_ASSERTION); + assertEquals(3456.0, values.get("99.0"), DELTA_FOR_SCORE_ASSERTION); + } finally { + wipeOfTestResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, null, null, SEARCH_PIPELINE); + } + } + + private void testPercentileRankAggs() throws IOException { + try { + prepareResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, SEARCH_PIPELINE); + AggregationBuilder aggsBuilder = AggregationBuilders.percentileRanks(GENERIC_AGGREGATION_NAME, new double[] { 2000, 3000 }) + .field(INTEGER_FIELD_1); + Map searchResponseAsMap = executeQueryAndGetAggsResults( + aggsBuilder, + TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS + ); + + assertHitResultsFromQuery(3, searchResponseAsMap); + + Map aggregations = getAggregations(searchResponseAsMap); + assertNotNull(aggregations); + assertTrue(aggregations.containsKey(GENERIC_AGGREGATION_NAME)); + Map> aggsValues = getAggregationValues(aggregations, GENERIC_AGGREGATION_NAME); + assertNotNull(aggsValues); + Map values = aggsValues.get("values"); + assertNotNull(values); + assertEquals(33.333, values.get("2000.0"), DELTA_FOR_SCORE_ASSERTION); + assertEquals(66.666, values.get("3000.0"), DELTA_FOR_SCORE_ASSERTION); + } finally { + wipeOfTestResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, null, null, SEARCH_PIPELINE); + } + } + + private void testSumAggs() throws IOException { + try { + prepareResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, SEARCH_PIPELINE); + + AggregationBuilder aggsBuilder = AggregationBuilders.sum(SUM_AGGREGATION_NAME).field(INTEGER_FIELD_1); + Map searchResponseAsMap = executeQueryAndGetAggsResults( + aggsBuilder, + TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS + ); + + Map aggregations = getAggregations(searchResponseAsMap); + assertNotNull(aggregations); + assertTrue(aggregations.containsKey(SUM_AGGREGATION_NAME)); + double maxAggsValue = getAggregationValue(aggregations, SUM_AGGREGATION_NAME); + assertEquals(7035.0, maxAggsValue, DELTA_FOR_SCORE_ASSERTION); + } finally { + wipeOfTestResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, null, null, SEARCH_PIPELINE); + } + } + + private void testValueCountAggs() throws IOException { + try { + prepareResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, SEARCH_PIPELINE); + AggregationBuilder aggsBuilder = AggregationBuilders.count(GENERIC_AGGREGATION_NAME).field(INTEGER_FIELD_1); + Map searchResponseAsMap = executeQueryAndGetAggsResults( + aggsBuilder, + TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS + ); + + assertHitResultsFromQuery(3, searchResponseAsMap); + + Map aggregations = getAggregations(searchResponseAsMap); + assertNotNull(aggregations); + + assertTrue(aggregations.containsKey(GENERIC_AGGREGATION_NAME)); + assertEquals(3, (int) getAggregationValue(aggregations, GENERIC_AGGREGATION_NAME)); + } finally { + wipeOfTestResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, null, null, SEARCH_PIPELINE); + } + } + + private void testSumAggsAndRangePostFilter() throws IOException { + try { + prepareResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, SEARCH_PIPELINE); + + AggregationBuilder aggsBuilder = AggregationBuilders.sum(SUM_AGGREGATION_NAME).field(INTEGER_FIELD_1); + + TermQueryBuilder termQueryBuilder1 = QueryBuilders.termQuery(TEST_TEXT_FIELD_NAME_1, TEST_QUERY_TEXT3); + TermQueryBuilder termQueryBuilder2 = QueryBuilders.termQuery(TEST_TEXT_FIELD_NAME_1, TEST_QUERY_TEXT4); + TermQueryBuilder termQueryBuilder3 = QueryBuilders.termQuery(TEST_TEXT_FIELD_NAME_1, TEST_QUERY_TEXT5); + + HybridQueryBuilder hybridQueryBuilderNeuralThenTerm = new HybridQueryBuilder(); + hybridQueryBuilderNeuralThenTerm.add(termQueryBuilder1); + hybridQueryBuilderNeuralThenTerm.add(termQueryBuilder2); + hybridQueryBuilderNeuralThenTerm.add(termQueryBuilder3); + + QueryBuilder rangeFilterQuery = QueryBuilders.rangeQuery(INTEGER_FIELD_1).gte(3000).lte(5000); + + Map searchResponseAsMap = search( + TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, + hybridQueryBuilderNeuralThenTerm, + null, + 10, + Map.of("search_pipeline", SEARCH_PIPELINE), + List.of(aggsBuilder), + rangeFilterQuery + ); + + Map aggregations = getAggregations(searchResponseAsMap); + assertNotNull(aggregations); + assertTrue(aggregations.containsKey(SUM_AGGREGATION_NAME)); + double maxAggsValue = getAggregationValue(aggregations, SUM_AGGREGATION_NAME); + assertEquals(11602.0, maxAggsValue, DELTA_FOR_SCORE_ASSERTION); + + assertHitResultsFromQuery(2, searchResponseAsMap); + + // assert post-filter + List> hitsNestedList = getNestedHits(searchResponseAsMap); + + List docIndexes = new ArrayList<>(); + for (Map oneHit : hitsNestedList) { + assertNotNull(oneHit.get("_source")); + Map source = (Map) oneHit.get("_source"); + int docIndex = (int) source.get(INTEGER_FIELD_1); + docIndexes.add(docIndex); + } + assertEquals(0, docIndexes.stream().filter(docIndex -> docIndex < 3000 || docIndex > 5000).count()); + } finally { + wipeOfTestResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, null, null, SEARCH_PIPELINE); + } + } + + private Map executeQueryAndGetAggsResults(final Object aggsBuilder, String indexName) { + return executeQueryAndGetAggsResults(List.of(aggsBuilder), indexName); + } + + private Map executeQueryAndGetAggsResults( + final List aggsBuilders, + QueryBuilder queryBuilder, + String indexName, + int expectedHits + ) { + initializeIndexIfNotExist(indexName); + + Map searchResponseAsMap = search( + indexName, + queryBuilder, + null, + 10, + Map.of("search_pipeline", SEARCH_PIPELINE), + aggsBuilders + ); + + assertHitResultsFromQuery(expectedHits, searchResponseAsMap); + return searchResponseAsMap; + } + + private Map executeQueryAndGetAggsResults( + final List aggsBuilders, + QueryBuilder queryBuilder, + String indexName + ) { + return executeQueryAndGetAggsResults(aggsBuilders, queryBuilder, indexName, 3); + } + + private Map executeQueryAndGetAggsResults(final List aggsBuilders, String indexName) { + + TermQueryBuilder termQueryBuilder1 = QueryBuilders.termQuery(TEST_TEXT_FIELD_NAME_1, TEST_QUERY_TEXT3); + TermQueryBuilder termQueryBuilder2 = QueryBuilders.termQuery(TEST_TEXT_FIELD_NAME_1, TEST_QUERY_TEXT5); + + HybridQueryBuilder hybridQueryBuilderNeuralThenTerm = new HybridQueryBuilder(); + hybridQueryBuilderNeuralThenTerm.add(termQueryBuilder1); + hybridQueryBuilderNeuralThenTerm.add(termQueryBuilder2); + + return executeQueryAndGetAggsResults(aggsBuilders, hybridQueryBuilderNeuralThenTerm, indexName); + } +} diff --git a/src/test/java/org/opensearch/neuralsearch/query/aggregation/PipelineAggregationsWithHybridQueryIT.java b/src/test/java/org/opensearch/neuralsearch/query/aggregation/PipelineAggregationsWithHybridQueryIT.java new file mode 100644 index 000000000..2e41d6e96 --- /dev/null +++ b/src/test/java/org/opensearch/neuralsearch/query/aggregation/PipelineAggregationsWithHybridQueryIT.java @@ -0,0 +1,403 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.neuralsearch.query.aggregation; + +import lombok.SneakyThrows; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.index.query.TermQueryBuilder; +import org.opensearch.neuralsearch.query.HybridQueryBuilder; +import org.opensearch.script.Script; +import org.opensearch.search.aggregations.AggregationBuilder; +import org.opensearch.search.aggregations.AggregationBuilders; +import org.opensearch.search.aggregations.AggregatorFactories; +import org.opensearch.search.aggregations.PipelineAggregatorBuilders; +import org.opensearch.search.aggregations.bucket.histogram.DateHistogramInterval; +import org.opensearch.search.aggregations.pipeline.StatsBucketPipelineAggregationBuilder; +import org.opensearch.search.sort.FieldSortBuilder; +import org.opensearch.search.sort.SortOrder; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import static org.opensearch.neuralsearch.TestUtils.DELTA_FOR_SCORE_ASSERTION; +import static org.opensearch.neuralsearch.util.AggregationsTestUtils.getAggregationBuckets; +import static org.opensearch.neuralsearch.util.AggregationsTestUtils.getAggregationValue; +import static org.opensearch.neuralsearch.util.AggregationsTestUtils.getAggregationValues; +import static org.opensearch.neuralsearch.util.AggregationsTestUtils.getAggregations; + +/** + * Integration tests for pipeline type aggregations when they are bundled with hybrid query + * Below is list of aggregations that are present in this test: + * - bucket_sort + * - cumulative_sum + * + * Following metric aggs are tested by other integ tests: + * - min_bucket + * - max_bucket + * - sum_bucket + * - avg_bucket + * + * Below aggregations are not part of any test: + * - derivative + * - moving_avg + * - serial_diff + */ +public class PipelineAggregationsWithHybridQueryIT extends BaseAggregationsWithHybridQueryIT { + private static final String TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS = + "test-aggs-pipeline-multi-doc-index-multiple-shards"; + private static final String SEARCH_PIPELINE = "search-pipeline-pipeline-aggs"; + + @SneakyThrows + public void testWithConcurrentSegmentSearch_whenDateBucketedSumsPipelinedToBucketStatsAggs_thenSuccessful() { + updateClusterSettings(CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH, true); + testDateBucketedSumsPipelinedToBucketStatsAggs(); + } + + @SneakyThrows + public void testPipelineSiblingAggs_whenDateBucketedSumsPipelinedToBucketStatsAggs_thenSuccessful() { + updateClusterSettings(CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH, false); + testDateBucketedSumsPipelinedToBucketStatsAggs(); + } + + @SneakyThrows + public void testWithConcurrentSegmentSearch_whenDateBucketedSumsPipelinedToBucketScriptedAggs_thenSuccessful() { + updateClusterSettings(CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH, true); + testDateBucketedSumsPipelinedToBucketScriptedAggs(); + } + + @SneakyThrows + public void testPipelineParentAggs_whenDateBucketedSumsPipelinedToBucketScriptedAggs_thenSuccessful() { + updateClusterSettings(CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH, false); + testDateBucketedSumsPipelinedToBucketScriptedAggs(); + } + + @SneakyThrows + public void testWithConcurrentSegmentSearch_whenDateBucketedSumsPipelinedToBucketSortAggs_thenSuccessful() { + updateClusterSettings(CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH, true); + testDateBucketedSumsPipelinedToBucketSortAggs(); + } + + @SneakyThrows + public void testPipelineParentAggs_whenDateBucketedSumsPipelinedToBucketSortAggs_thenSuccessful() { + updateClusterSettings(CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH, false); + testDateBucketedSumsPipelinedToBucketSortAggs(); + } + + @SneakyThrows + public void testWithConcurrentSegmentSearch_whenDateBucketedSumsPipelinedToCumulativeSumAggs_thenSuccessful() { + updateClusterSettings(CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH, true); + testDateBucketedSumsPipelinedToCumulativeSumAggs(); + } + + @SneakyThrows + public void testPipelineParentAggs_whenDateBucketedSumsPipelinedToCumulativeSumAggs_thenSuccessful() { + updateClusterSettings(CLUSTER_SETTING_CONCURRENT_SEGMENT_SEARCH, false); + testDateBucketedSumsPipelinedToCumulativeSumAggs(); + } + + private void testDateBucketedSumsPipelinedToBucketStatsAggs() throws IOException { + try { + prepareResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, SEARCH_PIPELINE); + + AggregationBuilder aggDateHisto = AggregationBuilders.dateHistogram(GENERIC_AGGREGATION_NAME) + .calendarInterval(DateHistogramInterval.YEAR) + .field(DATE_FIELD_1) + .subAggregation(AggregationBuilders.sum(SUM_AGGREGATION_NAME).field(INTEGER_FIELD_1)); + + StatsBucketPipelineAggregationBuilder aggStatsBucket = PipelineAggregatorBuilders.statsBucket( + BUCKETS_AGGREGATION_NAME_1, + GENERIC_AGGREGATION_NAME + ">" + SUM_AGGREGATION_NAME + ); + + Map searchResponseAsMap = executeQueryAndGetAggsResults( + List.of(aggDateHisto, aggStatsBucket), + TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS + ); + + Map aggregations = getAggregations(searchResponseAsMap); + assertNotNull(aggregations); + + Map statsAggs = getAggregationValues(aggregations, BUCKETS_AGGREGATION_NAME_1); + + assertNotNull(statsAggs); + + assertEquals(3517.5, (Double) statsAggs.get("avg"), DELTA_FOR_SCORE_ASSERTION); + assertEquals(7035.0, (Double) statsAggs.get("sum"), DELTA_FOR_SCORE_ASSERTION); + assertEquals(1234.0, (Double) statsAggs.get("min"), DELTA_FOR_SCORE_ASSERTION); + assertEquals(5801.0, (Double) statsAggs.get("max"), DELTA_FOR_SCORE_ASSERTION); + assertEquals(2, (int) statsAggs.get("count")); + } finally { + wipeOfTestResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, null, null, SEARCH_PIPELINE); + } + } + + private void testDateBucketedSumsPipelinedToBucketScriptedAggs() throws IOException { + try { + prepareResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, SEARCH_PIPELINE); + + AggregationBuilder aggBuilder = AggregationBuilders.dateHistogram(DATE_AGGREGATION_NAME) + .calendarInterval(DateHistogramInterval.YEAR) + .field(DATE_FIELD_1) + .subAggregations( + new AggregatorFactories.Builder().addAggregator(AggregationBuilders.sum(SUM_AGGREGATION_NAME).field(INTEGER_FIELD_1)) + .addAggregator( + AggregationBuilders.filter( + GENERIC_AGGREGATION_NAME, + QueryBuilders.boolQuery() + .should( + QueryBuilders.boolQuery() + .should(QueryBuilders.termQuery(KEYWORD_FIELD_1, KEYWORD_FIELD_1_VALUE)) + .should(QueryBuilders.termQuery(KEYWORD_FIELD_1, KEYWORD_FIELD_2_VALUE)) + ) + .should(QueryBuilders.boolQuery().mustNot(QueryBuilders.existsQuery(KEYWORD_FIELD_1))) + ).subAggregation(AggregationBuilders.sum(SUM_AGGREGATION_NAME_2).field(INTEGER_FIELD_PRICE)) + ) + .addPipelineAggregator( + PipelineAggregatorBuilders.bucketScript( + BUCKETS_AGGREGATION_NAME_1, + Map.of("docNum", GENERIC_AGGREGATION_NAME + ">" + SUM_AGGREGATION_NAME_2, "totalNum", SUM_AGGREGATION_NAME), + new Script("params.docNum / params.totalNum") + ) + ) + ); + + Map searchResponseAsMap = executeQueryAndGetAggsResults( + aggBuilder, + TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS + ); + + Map aggregations = getAggregations(searchResponseAsMap); + assertNotNull(aggregations); + + List> buckets = getAggregationBuckets(aggregations, DATE_AGGREGATION_NAME); + + assertNotNull(buckets); + assertEquals(21, buckets.size()); + + // check content of few buckets + // first bucket have all the aggs values + Map firstBucket = buckets.get(0); + assertEquals(6, firstBucket.size()); + assertEquals("01/01/1995", firstBucket.get(BUCKET_AGG_KEY_AS_STRING)); + assertEquals(1, firstBucket.get(BUCKET_AGG_DOC_COUNT_FIELD)); + assertEquals(0.1053, getAggregationValue(firstBucket, BUCKETS_AGGREGATION_NAME_1), DELTA_FOR_SCORE_ASSERTION); + assertEquals(1234.0, getAggregationValue(firstBucket, SUM_AGGREGATION_NAME), DELTA_FOR_SCORE_ASSERTION); + assertTrue(firstBucket.containsKey(KEY)); + + Map inBucketAggValues = getAggregationValues(firstBucket, GENERIC_AGGREGATION_NAME); + assertNotNull(inBucketAggValues); + assertEquals(1, inBucketAggValues.get(BUCKET_AGG_DOC_COUNT_FIELD)); + assertEquals(130.0, getAggregationValue(inBucketAggValues, SUM_AGGREGATION_NAME_2), DELTA_FOR_SCORE_ASSERTION); + + // second bucket is empty + Map secondBucket = buckets.get(1); + assertEquals(5, secondBucket.size()); + assertEquals("01/01/1996", secondBucket.get(BUCKET_AGG_KEY_AS_STRING)); + assertEquals(0, secondBucket.get(BUCKET_AGG_DOC_COUNT_FIELD)); + assertFalse(secondBucket.containsKey(BUCKETS_AGGREGATION_NAME_1)); + assertEquals(0.0, getAggregationValue(secondBucket, SUM_AGGREGATION_NAME), DELTA_FOR_SCORE_ASSERTION); + assertTrue(secondBucket.containsKey(KEY)); + + Map inSecondBucketAggValues = getAggregationValues(secondBucket, GENERIC_AGGREGATION_NAME); + assertNotNull(inSecondBucketAggValues); + assertEquals(0, inSecondBucketAggValues.get(BUCKET_AGG_DOC_COUNT_FIELD)); + assertEquals(0.0, getAggregationValue(inSecondBucketAggValues, SUM_AGGREGATION_NAME_2), DELTA_FOR_SCORE_ASSERTION); + + // last bucket has values + Map lastBucket = buckets.get(buckets.size() - 1); + assertEquals(6, lastBucket.size()); + assertEquals("01/01/2015", lastBucket.get(BUCKET_AGG_KEY_AS_STRING)); + assertEquals(2, lastBucket.get(BUCKET_AGG_DOC_COUNT_FIELD)); + assertEquals(0.0172, getAggregationValue(lastBucket, BUCKETS_AGGREGATION_NAME_1), DELTA_FOR_SCORE_ASSERTION); + assertEquals(5801.0, getAggregationValue(lastBucket, SUM_AGGREGATION_NAME), DELTA_FOR_SCORE_ASSERTION); + assertTrue(lastBucket.containsKey(KEY)); + + Map inLastBucketAggValues = getAggregationValues(lastBucket, GENERIC_AGGREGATION_NAME); + assertNotNull(inLastBucketAggValues); + assertEquals(1, inLastBucketAggValues.get(BUCKET_AGG_DOC_COUNT_FIELD)); + assertEquals(100.0, getAggregationValue(inLastBucketAggValues, SUM_AGGREGATION_NAME_2), DELTA_FOR_SCORE_ASSERTION); + } finally { + wipeOfTestResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, null, null, SEARCH_PIPELINE); + } + } + + private void testDateBucketedSumsPipelinedToBucketSortAggs() throws IOException { + try { + prepareResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, SEARCH_PIPELINE); + + AggregationBuilder aggBuilder = AggregationBuilders.dateHistogram(DATE_AGGREGATION_NAME) + .calendarInterval(DateHistogramInterval.YEAR) + .field(DATE_FIELD_1) + .subAggregations( + new AggregatorFactories.Builder().addAggregator(AggregationBuilders.sum(SUM_AGGREGATION_NAME).field(INTEGER_FIELD_1)) + .addPipelineAggregator( + PipelineAggregatorBuilders.bucketSort( + BUCKETS_AGGREGATION_NAME_1, + List.of(new FieldSortBuilder(SUM_AGGREGATION_NAME).order(SortOrder.DESC)) + ).size(5) + ) + ); + + QueryBuilder queryBuilder = QueryBuilders.boolQuery() + .should( + QueryBuilders.boolQuery() + .should(QueryBuilders.termQuery(KEYWORD_FIELD_1, KEYWORD_FIELD_1_VALUE)) + .should(QueryBuilders.termQuery(KEYWORD_FIELD_1, KEYWORD_FIELD_2_VALUE)) + ) + .should(QueryBuilders.boolQuery().mustNot(QueryBuilders.existsQuery(KEYWORD_FIELD_1))); + + Map searchResponseAsMap = executeQueryAndGetAggsResults( + List.of(aggBuilder), + queryBuilder, + TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS + ); + + Map aggregations = getAggregations(searchResponseAsMap); + assertNotNull(aggregations); + + List> buckets = getAggregationBuckets(aggregations, DATE_AGGREGATION_NAME); + + assertNotNull(buckets); + assertEquals(3, buckets.size()); + + // check content of few buckets + Map firstBucket = buckets.get(0); + assertEquals(4, firstBucket.size()); + assertEquals("01/01/2015", firstBucket.get(BUCKET_AGG_KEY_AS_STRING)); + assertEquals(1, firstBucket.get(BUCKET_AGG_DOC_COUNT_FIELD)); + assertEquals(2345.0, getAggregationValue(firstBucket, SUM_AGGREGATION_NAME), DELTA_FOR_SCORE_ASSERTION); + assertTrue(firstBucket.containsKey(KEY)); + + // second bucket is empty + Map secondBucket = buckets.get(1); + assertEquals(4, secondBucket.size()); + assertEquals("01/01/1995", secondBucket.get(BUCKET_AGG_KEY_AS_STRING)); + assertEquals(1, secondBucket.get(BUCKET_AGG_DOC_COUNT_FIELD)); + assertEquals(1234.0, getAggregationValue(secondBucket, SUM_AGGREGATION_NAME), DELTA_FOR_SCORE_ASSERTION); + assertTrue(secondBucket.containsKey(KEY)); + + // last bucket has values + Map lastBucket = buckets.get(buckets.size() - 1); + assertEquals(4, lastBucket.size()); + assertEquals("01/01/2007", lastBucket.get(BUCKET_AGG_KEY_AS_STRING)); + assertEquals(1, lastBucket.get(BUCKET_AGG_DOC_COUNT_FIELD)); + assertEquals(0.0, getAggregationValue(lastBucket, SUM_AGGREGATION_NAME), DELTA_FOR_SCORE_ASSERTION); + assertTrue(lastBucket.containsKey(KEY)); + } finally { + wipeOfTestResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, null, null, SEARCH_PIPELINE); + } + } + + private void testDateBucketedSumsPipelinedToCumulativeSumAggs() throws IOException { + try { + prepareResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, SEARCH_PIPELINE); + + AggregationBuilder aggBuilder = AggregationBuilders.dateHistogram(DATE_AGGREGATION_NAME) + .calendarInterval(DateHistogramInterval.YEAR) + .field(DATE_FIELD_1) + .subAggregations( + new AggregatorFactories.Builder().addAggregator(AggregationBuilders.sum(SUM_AGGREGATION_NAME).field(INTEGER_FIELD_1)) + .addPipelineAggregator(PipelineAggregatorBuilders.cumulativeSum(BUCKETS_AGGREGATION_NAME_1, SUM_AGGREGATION_NAME)) + ); + + QueryBuilder queryBuilder = QueryBuilders.boolQuery() + .should( + QueryBuilders.boolQuery() + .should(QueryBuilders.termQuery(KEYWORD_FIELD_1, KEYWORD_FIELD_1_VALUE)) + .should(QueryBuilders.termQuery(KEYWORD_FIELD_1, KEYWORD_FIELD_2_VALUE)) + ) + .should(QueryBuilders.boolQuery().mustNot(QueryBuilders.existsQuery(KEYWORD_FIELD_1))); + + Map searchResponseAsMap = executeQueryAndGetAggsResults( + List.of(aggBuilder), + queryBuilder, + TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS + ); + + Map aggregations = getAggregations(searchResponseAsMap); + assertNotNull(aggregations); + + List> buckets = getAggregationBuckets(aggregations, DATE_AGGREGATION_NAME); + + assertNotNull(buckets); + assertEquals(21, buckets.size()); + + // check content of few buckets + Map firstBucket = buckets.get(0); + assertEquals(5, firstBucket.size()); + assertEquals("01/01/1995", firstBucket.get(BUCKET_AGG_KEY_AS_STRING)); + assertEquals(1, firstBucket.get(BUCKET_AGG_DOC_COUNT_FIELD)); + assertEquals(1234.0, getAggregationValue(firstBucket, SUM_AGGREGATION_NAME), DELTA_FOR_SCORE_ASSERTION); + assertEquals(1234.0, getAggregationValue(firstBucket, BUCKETS_AGGREGATION_NAME_1), DELTA_FOR_SCORE_ASSERTION); + assertTrue(firstBucket.containsKey(KEY)); + + Map secondBucket = buckets.get(1); + assertEquals(5, secondBucket.size()); + assertEquals("01/01/1996", secondBucket.get(BUCKET_AGG_KEY_AS_STRING)); + assertEquals(0, secondBucket.get(BUCKET_AGG_DOC_COUNT_FIELD)); + assertEquals(0.0, getAggregationValue(secondBucket, SUM_AGGREGATION_NAME), DELTA_FOR_SCORE_ASSERTION); + assertEquals(1234.0, getAggregationValue(secondBucket, BUCKETS_AGGREGATION_NAME_1), DELTA_FOR_SCORE_ASSERTION); + assertTrue(secondBucket.containsKey(KEY)); + + // last bucket is empty + Map lastBucket = buckets.get(buckets.size() - 1); + assertEquals(5, lastBucket.size()); + assertEquals("01/01/2015", lastBucket.get(BUCKET_AGG_KEY_AS_STRING)); + assertEquals(1, lastBucket.get(BUCKET_AGG_DOC_COUNT_FIELD)); + assertEquals(2345.0, getAggregationValue(lastBucket, SUM_AGGREGATION_NAME), DELTA_FOR_SCORE_ASSERTION); + assertEquals(3579.0, getAggregationValue(lastBucket, BUCKETS_AGGREGATION_NAME_1), DELTA_FOR_SCORE_ASSERTION); + assertTrue(lastBucket.containsKey(KEY)); + } finally { + wipeOfTestResources(TEST_MULTI_DOC_INDEX_WITH_TEXT_AND_INT_MULTIPLE_SHARDS, null, null, SEARCH_PIPELINE); + } + } + + private Map executeQueryAndGetAggsResults(final Object aggsBuilder, String indexName) { + return executeQueryAndGetAggsResults(List.of(aggsBuilder), indexName); + } + + private Map executeQueryAndGetAggsResults( + final List aggsBuilders, + QueryBuilder queryBuilder, + String indexName, + int expectedHits + ) { + initializeIndexIfNotExist(indexName); + + Map searchResponseAsMap = search( + indexName, + queryBuilder, + null, + 10, + Map.of("search_pipeline", SEARCH_PIPELINE), + aggsBuilders + ); + + assertHitResultsFromQuery(expectedHits, searchResponseAsMap); + return searchResponseAsMap; + } + + private Map executeQueryAndGetAggsResults( + final List aggsBuilders, + QueryBuilder queryBuilder, + String indexName + ) { + return executeQueryAndGetAggsResults(aggsBuilders, queryBuilder, indexName, 3); + } + + private Map executeQueryAndGetAggsResults(final List aggsBuilders, String indexName) { + + TermQueryBuilder termQueryBuilder1 = QueryBuilders.termQuery(TEST_TEXT_FIELD_NAME_1, TEST_QUERY_TEXT3); + TermQueryBuilder termQueryBuilder2 = QueryBuilders.termQuery(TEST_TEXT_FIELD_NAME_1, TEST_QUERY_TEXT5); + + HybridQueryBuilder hybridQueryBuilderNeuralThenTerm = new HybridQueryBuilder(); + hybridQueryBuilderNeuralThenTerm.add(termQueryBuilder1); + hybridQueryBuilderNeuralThenTerm.add(termQueryBuilder2); + + return executeQueryAndGetAggsResults(aggsBuilders, hybridQueryBuilderNeuralThenTerm, indexName); + } +}