From 1dadf25fa1747b48d290919ee55f1cd5b73bd7c2 Mon Sep 17 00:00:00 2001 From: Varun Jain Date: Fri, 26 Jan 2024 22:43:11 +0530 Subject: [PATCH] BWC tests for Multimodal Search, Hybrid Search and Neural Sparse Search (#533) * Initial commit of BWC Test Signed-off-by: Varun Jain --- ...backwards_compatibility_tests_workflow.yml | 9 +- CHANGELOG.md | 1 + qa/restart-upgrade/build.gradle | 18 + .../AbstractRestartUpgradeRestTestCase.java | 44 ++- .../neuralsearch/bwc/HybridSearchIT.java | 129 +++++++ .../neuralsearch/bwc/MultiModalSearchIT.java | 61 ++++ .../bwc/NeuralSparseSearchIT.java | 90 +++++ .../neuralsearch/bwc/SemanticSearchIT.java | 49 +-- ...gs.json => IndexMappingMultipleShard.json} | 4 +- .../processor/IndexMappingSingleShard.json | 34 ++ ...rSparseEncodingProcessorConfiguration.json | 13 + ...ineForTextImageProcessorConfiguration.json | 15 + .../processor/SparseIndexMappings.json | 17 + .../UploadSparseEncodingModelRequestBody.json | 10 + qa/rolling-upgrade/build.gradle | 40 ++ .../bwc/AbstractRollingUpgradeTestCase.java | 52 ++- .../neuralsearch/bwc/HybridSearchIT.java | 121 ++++++ .../neuralsearch/bwc/MultiModalSearchIT.java | 83 +++++ .../bwc/NeuralSparseSearchIT.java | 120 ++++++ .../neuralsearch/bwc/SemanticSearchIT.java | 62 +--- .../resources/processor/IndexMappings.json | 4 +- ...rSparseEncodingProcessorConfiguration.json | 13 + ...ineForTextImageProcessorConfiguration.json | 15 + .../processor/SparseIndexMappings.json | 17 + .../UploadSparseEncodingModelRequestBody.json | 10 + .../NeuralQueryEnricherProcessorIT.java | 4 +- .../processor/ScoreCombinationIT.java | 3 + .../processor/ScoreNormalizationIT.java | 3 + .../processor/SparseEncodingProcessIT.java | 6 +- .../processor/TextEmbeddingProcessorIT.java | 2 +- .../query/NeuralSparseQueryIT.java | 6 +- .../neuralsearch/BaseNeuralSearchIT.java | 345 +++++++++++++++--- .../neuralsearch/BaseSparseEncodingIT.java | 138 ------- .../opensearch/neuralsearch/TestUtils.java | 21 +- 34 files changed, 1259 insertions(+), 300 deletions(-) create mode 100644 qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/HybridSearchIT.java create mode 100644 qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/MultiModalSearchIT.java create mode 100644 qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/NeuralSparseSearchIT.java rename qa/restart-upgrade/src/test/resources/processor/{IndexMappings.json => IndexMappingMultipleShard.json} (91%) create mode 100644 qa/restart-upgrade/src/test/resources/processor/IndexMappingSingleShard.json create mode 100644 qa/restart-upgrade/src/test/resources/processor/PipelineForSparseEncodingProcessorConfiguration.json create mode 100644 qa/restart-upgrade/src/test/resources/processor/PipelineForTextImageProcessorConfiguration.json create mode 100644 qa/restart-upgrade/src/test/resources/processor/SparseIndexMappings.json create mode 100644 qa/restart-upgrade/src/test/resources/processor/UploadSparseEncodingModelRequestBody.json create mode 100644 qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/HybridSearchIT.java create mode 100644 qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/MultiModalSearchIT.java create mode 100644 qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/NeuralSparseSearchIT.java create mode 100644 qa/rolling-upgrade/src/test/resources/processor/PipelineForSparseEncodingProcessorConfiguration.json create mode 100644 qa/rolling-upgrade/src/test/resources/processor/PipelineForTextImageProcessorConfiguration.json create mode 100644 qa/rolling-upgrade/src/test/resources/processor/SparseIndexMappings.json create mode 100644 qa/rolling-upgrade/src/test/resources/processor/UploadSparseEncodingModelRequestBody.json delete mode 100644 src/testFixtures/java/org/opensearch/neuralsearch/BaseSparseEncodingIT.java diff --git a/.github/workflows/backwards_compatibility_tests_workflow.yml b/.github/workflows/backwards_compatibility_tests_workflow.yml index 9b34b6356..82b70dcf0 100644 --- a/.github/workflows/backwards_compatibility_tests_workflow.yml +++ b/.github/workflows/backwards_compatibility_tests_workflow.yml @@ -36,13 +36,13 @@ jobs: - name: Run NeuralSearch Restart-Upgrade BWC Tests from BWCVersion-${{ matrix.bwc_version }} to OpenSearch Version-${{ matrix.opensearch_version }} on ${{matrix.os}} run: | echo "Running restart-upgrade backwards compatibility tests ..." -# Disabling BWC tests due to ongoing build failure. https://github.com/opensearch-project/neural-search/issues/536 -# ./gradlew :qa:restart-upgrade:testAgainstNewCluster -D'tests.bwc.version=${{ matrix.bwc_version }}' + ./gradlew :qa:restart-upgrade:testAgainstNewCluster -D'tests.bwc.version=${{ matrix.bwc_version }}' Rolling-Upgrade-BWCTests-NeuralSearch: strategy: matrix: - java: [ 11, 17, 21 ] + # Restricting java 21 to 21.0.1 due to ongoing bug in JDK 21.0.2 https://bugs.openjdk.org/browse/JDK-8323659. Once the fix https://github.com/opensearch-project/OpenSearch/pull/11968 get merged this change will be reverted. + java: [ 11, 17, 21.0.1 ] os: [ubuntu-latest,windows-latest] bwc_version: [ "2.12.0-SNAPSHOT" ] opensearch_version: [ "3.0.0-SNAPSHOT" ] @@ -64,5 +64,4 @@ jobs: - name: Run NeuralSearch Rolling-Upgrade BWC Tests from BWCVersion-${{ matrix.bwc_version }} to OpenSearch Version-${{ matrix.opensearch_version }} on ${{matrix.os}} run: | echo "Running rolling-upgrade backwards compatibility tests ..." -# Disabling BWC tests due to ongoing build failure. https://github.com/opensearch-project/neural-search/issues/536 -# ./gradlew :qa:rolling-upgrade:testRollingUpgrade -D'tests.bwc.version=${{ matrix.bwc_version }}' + ./gradlew :qa:rolling-upgrade:testRollingUpgrade -D'tests.bwc.version=${{ matrix.bwc_version }}' diff --git a/CHANGELOG.md b/CHANGELOG.md index 922016575..da1efa0b7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Infrastructure - BWC tests for Neural Search ([#515](https://github.com/opensearch-project/neural-search/pull/515)) - Github action to run integ tests in secure opensearch cluster ([#535](https://github.com/opensearch-project/neural-search/pull/535)) +- BWC tests for Multimodal search, Hybrid Search and Neural Sparse Search ([#533](https://github.com/opensearch-project/neural-search/pull/533)) ### Documentation ### Maintenance ### Refactoring diff --git a/qa/restart-upgrade/build.gradle b/qa/restart-upgrade/build.gradle index b817c1395..a396fff29 100644 --- a/qa/restart-upgrade/build.gradle +++ b/qa/restart-upgrade/build.gradle @@ -35,6 +35,15 @@ task testAgainstOldCluster(type: StandaloneRestIntegTestTask) { systemProperty 'tests.skip_delete_model_index', 'true' systemProperty 'tests.plugin_bwc_version', neural_search_bwc_version + //Excluding MultiModalSearchIT, HybridSearchIT, NeuralSparseSearchIT tests from neural search version 2.9 and 2.10 because these features were released in 2.11 version. + if (neural_search_bwc_version.startsWith("2.9") || neural_search_bwc_version.startsWith("2.10")){ + filter { + excludeTestsMatching "org.opensearch.neuralsearch.bwc.MultiModalSearchIT.*" + excludeTestsMatching "org.opensearch.neuralsearch.bwc.HybridSearchIT.*" + excludeTestsMatching "org.opensearch.neuralsearch.bwc.NeuralSparseSearchIT.*" + } + } + nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}".allHttpSocketURI.join(",")}") nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}".getName()}") systemProperty 'tests.security.manager', 'false' @@ -53,6 +62,15 @@ task testAgainstNewCluster(type: StandaloneRestIntegTestTask) { systemProperty 'tests.is_old_cluster', 'false' systemProperty 'tests.plugin_bwc_version', neural_search_bwc_version + //Excluding MultiModalSearchIT, HybridSearchIT, NeuralSparseSearchIT tests from neural search version 2.9 and 2.10 because these features were released in 2.11 version. + if (neural_search_bwc_version.startsWith("2.9") || neural_search_bwc_version.startsWith("2.10")){ + filter { + excludeTestsMatching "org.opensearch.neuralsearch.bwc.MultiModalSearchIT.*" + excludeTestsMatching "org.opensearch.neuralsearch.bwc.HybridSearchIT.*" + excludeTestsMatching "org.opensearch.neuralsearch.bwc.NeuralSparseSearchIT.*" + } + } + nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}".allHttpSocketURI.join(",")}") nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}".getName()}") systemProperty 'tests.security.manager', 'false' diff --git a/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/AbstractRestartUpgradeRestTestCase.java b/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/AbstractRestartUpgradeRestTestCase.java index cf985d759..c2d2657f4 100644 --- a/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/AbstractRestartUpgradeRestTestCase.java +++ b/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/AbstractRestartUpgradeRestTestCase.java @@ -4,15 +4,18 @@ */ package org.opensearch.neuralsearch.bwc; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.Locale; import java.util.Optional; import org.junit.Before; import org.opensearch.common.settings.Settings; import org.opensearch.neuralsearch.BaseNeuralSearchIT; +import static org.opensearch.neuralsearch.TestUtils.NEURAL_SEARCH_BWC_PREFIX; import static org.opensearch.neuralsearch.TestUtils.CLIENT_TIMEOUT_VALUE; import static org.opensearch.neuralsearch.TestUtils.RESTART_UPGRADE_OLD_CLUSTER; import static org.opensearch.neuralsearch.TestUtils.BWC_VERSION; -import static org.opensearch.neuralsearch.TestUtils.NEURAL_SEARCH_BWC_PREFIX; +import static org.opensearch.neuralsearch.TestUtils.generateModelId; import org.opensearch.test.rest.OpenSearchRestTestCase; public abstract class AbstractRestartUpgradeRestTestCase extends BaseNeuralSearchIT { @@ -57,4 +60,43 @@ protected static final boolean isRunningAgainstOldCluster() { protected final Optional getBWCVersion() { return Optional.ofNullable(System.getProperty(BWC_VERSION, null)); } + + protected String uploadTextEmbeddingModel() throws Exception { + String requestBody = Files.readString(Path.of(classLoader.getResource("processor/UploadModelRequestBody.json").toURI())); + return registerModelGroupAndGetModelId(requestBody); + } + + protected String registerModelGroupAndGetModelId(final String requestBody) throws Exception { + String modelGroupRegisterRequestBody = Files.readString( + Path.of(classLoader.getResource("processor/CreateModelGroupRequestBody.json").toURI()) + ); + String modelGroupId = registerModelGroup(String.format(LOCALE, modelGroupRegisterRequestBody, generateModelId())); + return uploadModel(String.format(LOCALE, requestBody, modelGroupId)); + } + + protected void createPipelineProcessor(final String modelId, final String pipelineName) throws Exception { + String requestBody = Files.readString(Path.of(classLoader.getResource("processor/PipelineConfiguration.json").toURI())); + createPipelineProcessor(requestBody, pipelineName, modelId); + } + + protected String uploadSparseEncodingModel() throws Exception { + String requestBody = Files.readString( + Path.of(classLoader.getResource("processor/UploadSparseEncodingModelRequestBody.json").toURI()) + ); + return registerModelGroupAndGetModelId(requestBody); + } + + protected void createPipelineForTextImageProcessor(final String modelId, final String pipelineName) throws Exception { + String requestBody = Files.readString( + Path.of(classLoader.getResource("processor/PipelineForTextImageProcessorConfiguration.json").toURI()) + ); + createPipelineProcessor(requestBody, pipelineName, modelId); + } + + protected void createPipelineForSparseEncodingProcessor(final String modelId, final String pipelineName) throws Exception { + String requestBody = Files.readString( + Path.of(classLoader.getResource("processor/PipelineForSparseEncodingProcessorConfiguration.json").toURI()) + ); + createPipelineProcessor(requestBody, pipelineName, modelId); + } } diff --git a/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/HybridSearchIT.java b/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/HybridSearchIT.java new file mode 100644 index 000000000..48735182a --- /dev/null +++ b/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/HybridSearchIT.java @@ -0,0 +1,129 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.neuralsearch.bwc; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import org.opensearch.index.query.MatchQueryBuilder; +import static org.opensearch.neuralsearch.TestUtils.getModelId; +import static org.opensearch.neuralsearch.TestUtils.NODES_BWC_CLUSTER; +import static org.opensearch.neuralsearch.TestUtils.PARAM_NAME_WEIGHTS; +import static org.opensearch.neuralsearch.TestUtils.TEXT_EMBEDDING_PROCESSOR; +import static org.opensearch.neuralsearch.TestUtils.DEFAULT_NORMALIZATION_METHOD; +import static org.opensearch.neuralsearch.TestUtils.DEFAULT_COMBINATION_METHOD; +import org.opensearch.neuralsearch.query.HybridQueryBuilder; +import org.opensearch.neuralsearch.query.NeuralQueryBuilder; + +public class HybridSearchIT extends AbstractRestartUpgradeRestTestCase { + private static final String PIPELINE_NAME = "nlp-hybrid-pipeline"; + private static final String PIPELINE1_NAME = "nlp-hybrid-1-pipeline"; + private static final String SEARCH_PIPELINE_NAME = "nlp-search-pipeline"; + private static final String SEARCH_PIPELINE1_NAME = "nlp-search-1-pipeline"; + private static final String TEST_FIELD = "passage_text"; + private static final String TEXT_1 = "Hello world"; + private static final String TEXT_2 = "Hi planet"; + private static final String TEXT_3 = "Hi earth"; + private static final String TEXT_4 = "Hi amazon"; + private static final String TEXT_5 = "Hi mars"; + private static final String TEXT_6 = "Hi opensearch"; + private static final String QUERY = "Hi world"; + + // Test restart-upgrade normalization processor when index with multiple shards + // Create Text Embedding Processor, Ingestion Pipeline, add document and search pipeline with normalization processor + // Validate process , pipeline and document count in restart-upgrade scenario + public void testNormalizationProcessor_whenIndexWithMultipleShards_E2EFlow() throws Exception { + validateNormalizationProcessor("processor/IndexMappingMultipleShard.json", PIPELINE_NAME, SEARCH_PIPELINE_NAME); + } + + // Test restart-upgrade normalization processor when index with single shard + // Create Text Embedding Processor, Ingestion Pipeline, add document and search pipeline with normalization processor + // Validate process , pipeline and document count in restart-upgrade scenario + public void testNormalizationProcessor_whenIndexWithSingleShard_E2EFlow() throws Exception { + validateNormalizationProcessor("processor/IndexMappingSingleShard.json", PIPELINE1_NAME, SEARCH_PIPELINE1_NAME); + } + + private void validateNormalizationProcessor(final String fileName, final String pipelineName, final String searchPipelineName) + throws Exception { + waitForClusterHealthGreen(NODES_BWC_CLUSTER); + if (isRunningAgainstOldCluster()) { + String modelId = uploadTextEmbeddingModel(); + loadModel(modelId); + createPipelineProcessor(modelId, pipelineName); + createIndexWithConfiguration( + getIndexNameForTest(), + Files.readString(Path.of(classLoader.getResource(fileName).toURI())), + pipelineName + ); + addDocuments(getIndexNameForTest(), true); + createSearchPipeline(searchPipelineName); + } else { + String modelId = null; + try { + modelId = getModelId(getIngestionPipeline(pipelineName), TEXT_EMBEDDING_PROCESSOR); + loadModel(modelId); + addDocuments(getIndexNameForTest(), false); + validateTestIndex(modelId, getIndexNameForTest(), searchPipelineName); + } finally { + wipeOfTestResources(getIndexNameForTest(), pipelineName, modelId, searchPipelineName); + } + } + } + + private void addDocuments(final String indexName, boolean isRunningAgainstOldCluster) throws IOException { + if (isRunningAgainstOldCluster) { + addDocument(indexName, "0", TEST_FIELD, TEXT_1, null, null); + addDocument(indexName, "1", TEST_FIELD, TEXT_2, null, null); + addDocument(indexName, "2", TEST_FIELD, TEXT_3, null, null); + addDocument(indexName, "3", TEST_FIELD, TEXT_4, null, null); + addDocument(indexName, "4", TEST_FIELD, TEXT_5, null, null); + } else { + addDocument(indexName, "5", TEST_FIELD, TEXT_6, null, null); + } + } + + private void createSearchPipeline(final String pipelineName) { + createSearchPipeline( + pipelineName, + DEFAULT_NORMALIZATION_METHOD, + DEFAULT_COMBINATION_METHOD, + Map.of(PARAM_NAME_WEIGHTS, Arrays.toString(new float[] { 0.3f, 0.7f })) + ); + } + + private void validateTestIndex(final String modelId, final String index, final String searchPipeline) throws Exception { + int docCount = getDocCount(index); + assertEquals(6, docCount); + HybridQueryBuilder hybridQueryBuilder = getQueryBuilder(modelId); + Map searchResponseAsMap = search(index, hybridQueryBuilder, null, 1, Map.of("search_pipeline", searchPipeline)); + assertNotNull(searchResponseAsMap); + int hits = getHitCount(searchResponseAsMap); + assertEquals(1, hits); + List scoresList = getNormalizationScoreList(searchResponseAsMap); + for (Double score : scoresList) { + assertTrue(0 <= score && score <= 2); + } + } + + private HybridQueryBuilder getQueryBuilder(final String modelId) { + NeuralQueryBuilder neuralQueryBuilder = new NeuralQueryBuilder(); + neuralQueryBuilder.fieldName("passage_embedding"); + neuralQueryBuilder.modelId(modelId); + neuralQueryBuilder.queryText(QUERY); + neuralQueryBuilder.k(5); + + MatchQueryBuilder matchQueryBuilder = new MatchQueryBuilder("text", QUERY); + + HybridQueryBuilder hybridQueryBuilder = new HybridQueryBuilder(); + hybridQueryBuilder.add(matchQueryBuilder); + hybridQueryBuilder.add(neuralQueryBuilder); + + return hybridQueryBuilder; + } + +} diff --git a/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/MultiModalSearchIT.java b/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/MultiModalSearchIT.java new file mode 100644 index 000000000..e6749d778 --- /dev/null +++ b/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/MultiModalSearchIT.java @@ -0,0 +1,61 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.neuralsearch.bwc; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Map; +import static org.opensearch.neuralsearch.TestUtils.NODES_BWC_CLUSTER; +import static org.opensearch.neuralsearch.TestUtils.TEXT_IMAGE_EMBEDDING_PROCESSOR; +import static org.opensearch.neuralsearch.TestUtils.getModelId; +import org.opensearch.neuralsearch.query.NeuralQueryBuilder; + +public class MultiModalSearchIT extends AbstractRestartUpgradeRestTestCase { + private static final String PIPELINE_NAME = "nlp-ingest-pipeline"; + private static final String TEST_FIELD = "passage_text"; + private static final String TEST_IMAGE_FIELD = "passage_image"; + private static final String TEXT = "Hello world"; + private static final String TEXT_1 = "Hello world a"; + private static final String TEST_IMAGE_TEXT = "/9j/4AAQSkZJRgABAQAASABIAAD"; + private static final String TEST_IMAGE_TEXT_1 = "/9j/4AAQSkZJRgbdwoeicfhoid"; + + // Test restart-upgrade test image embedding processor + // Create Text Image Embedding Processor, Ingestion Pipeline and add document + // Validate process , pipeline and document count in restart-upgrade scenario + public void testTextImageEmbeddingProcessor_E2EFlow() throws Exception { + waitForClusterHealthGreen(NODES_BWC_CLUSTER); + + if (isRunningAgainstOldCluster()) { + String modelId = uploadTextEmbeddingModel(); + loadModel(modelId); + createPipelineForTextImageProcessor(modelId, PIPELINE_NAME); + createIndexWithConfiguration( + getIndexNameForTest(), + Files.readString(Path.of(classLoader.getResource("processor/IndexMappingMultipleShard.json").toURI())), + PIPELINE_NAME + ); + addDocument(getIndexNameForTest(), "0", TEST_FIELD, TEXT, TEST_IMAGE_FIELD, TEST_IMAGE_TEXT); + } else { + String modelId = null; + try { + modelId = getModelId(getIngestionPipeline(PIPELINE_NAME), TEXT_IMAGE_EMBEDDING_PROCESSOR); + loadModel(modelId); + addDocument(getIndexNameForTest(), "1", TEST_FIELD, TEXT_1, TEST_IMAGE_FIELD, TEST_IMAGE_TEXT_1); + validateTestIndex(modelId); + } finally { + wipeOfTestResources(getIndexNameForTest(), PIPELINE_NAME, modelId, null); + } + } + } + + private void validateTestIndex(final String modelId) throws Exception { + int docCount = getDocCount(getIndexNameForTest()); + assertEquals(2, docCount); + NeuralQueryBuilder neuralQueryBuilder = new NeuralQueryBuilder("passage_embedding", TEXT, TEST_IMAGE_TEXT, modelId, 1, null, null); + Map response = search(getIndexNameForTest(), neuralQueryBuilder, 1); + assertNotNull(response); + } + +} diff --git a/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/NeuralSparseSearchIT.java b/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/NeuralSparseSearchIT.java new file mode 100644 index 000000000..22bd4a281 --- /dev/null +++ b/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/NeuralSparseSearchIT.java @@ -0,0 +1,90 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.neuralsearch.bwc; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import org.opensearch.index.query.BoolQueryBuilder; +import org.opensearch.index.query.MatchQueryBuilder; +import org.opensearch.neuralsearch.TestUtils; +import static org.opensearch.neuralsearch.TestUtils.NODES_BWC_CLUSTER; +import static org.opensearch.neuralsearch.TestUtils.SPARSE_ENCODING_PROCESSOR; +import static org.opensearch.neuralsearch.TestUtils.objectToFloat; +import org.opensearch.neuralsearch.query.NeuralSparseQueryBuilder; + +public class NeuralSparseSearchIT extends AbstractRestartUpgradeRestTestCase { + private static final String PIPELINE_NAME = "nlp-ingest-pipeline-sparse"; + private static final String TEST_SPARSE_ENCODING_FIELD = "passage_embedding"; + private static final String TEST_TEXT_FIELD = "passage_text"; + private static final String TEXT_1 = "Hello world a b"; + private static final String TEXT_2 = "Hello planet"; + private static final List TEST_TOKENS_1 = List.of("hello", "world", "a", "b", "c"); + private static final List TEST_TOKENS_2 = List.of("hello", "planet", "a", "b", "c"); + private final Map testRankFeaturesDoc1 = TestUtils.createRandomTokenWeightMap(TEST_TOKENS_1); + private final Map testRankFeaturesDoc2 = TestUtils.createRandomTokenWeightMap(TEST_TOKENS_2); + + // Test restart-upgrade test sparse embedding processor + // Create Sparse Encoding Processor, Ingestion Pipeline and add document + // Validate process , pipeline and document count in restart-upgrade scenario + public void testSparseEncodingProcessor_E2EFlow() throws Exception { + waitForClusterHealthGreen(NODES_BWC_CLUSTER); + if (isRunningAgainstOldCluster()) { + String modelId = uploadSparseEncodingModel(); + loadModel(modelId); + createPipelineForSparseEncodingProcessor(modelId, PIPELINE_NAME); + createIndexWithConfiguration( + getIndexNameForTest(), + Files.readString(Path.of(classLoader.getResource("processor/SparseIndexMappings.json").toURI())), + PIPELINE_NAME + ); + + addSparseEncodingDoc( + getIndexNameForTest(), + "0", + List.of(TEST_SPARSE_ENCODING_FIELD), + List.of(testRankFeaturesDoc1), + List.of(TEST_TEXT_FIELD), + List.of(TEXT_1) + ); + } else { + String modelId = null; + try { + modelId = TestUtils.getModelId(getIngestionPipeline(PIPELINE_NAME), SPARSE_ENCODING_PROCESSOR); + loadModel(modelId); + addSparseEncodingDoc( + getIndexNameForTest(), + "1", + List.of(TEST_SPARSE_ENCODING_FIELD), + List.of(testRankFeaturesDoc2), + List.of(TEST_TEXT_FIELD), + List.of(TEXT_2) + ); + validateTestIndex(modelId); + } finally { + wipeOfTestResources(getIndexNameForTest(), PIPELINE_NAME, modelId, null); + } + } + + } + + private void validateTestIndex(final String modelId) throws Exception { + int docCount = getDocCount(getIndexNameForTest()); + assertEquals(2, docCount); + BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder(); + NeuralSparseQueryBuilder sparseEncodingQueryBuilder = new NeuralSparseQueryBuilder().fieldName(TEST_SPARSE_ENCODING_FIELD) + .queryText(TEXT_1) + .modelId(modelId); + MatchQueryBuilder matchQueryBuilder = new MatchQueryBuilder(TEST_TEXT_FIELD, TEXT_1); + boolQueryBuilder.should(sparseEncodingQueryBuilder).should(matchQueryBuilder); + Map response = search(getIndexNameForTest(), boolQueryBuilder, 1); + Map firstInnerHit = getFirstInnerHit(response); + + assertEquals("0", firstInnerHit.get("_id")); + float minExpectedScore = computeExpectedScore(modelId, testRankFeaturesDoc1, TEXT_1); + assertTrue(minExpectedScore < objectToFloat(firstInnerHit.get("_score"))); + } +} diff --git a/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/SemanticSearchIT.java b/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/SemanticSearchIT.java index dc8a94236..ec5938cd9 100644 --- a/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/SemanticSearchIT.java +++ b/qa/restart-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/SemanticSearchIT.java @@ -4,13 +4,10 @@ */ package org.opensearch.neuralsearch.bwc; -import com.carrotsearch.randomizedtesting.RandomizedTest; import java.nio.file.Files; import java.nio.file.Path; import java.util.Map; - import static org.opensearch.neuralsearch.TestUtils.NODES_BWC_CLUSTER; - import static org.opensearch.neuralsearch.TestUtils.getModelId; import static org.opensearch.neuralsearch.TestUtils.TEXT_EMBEDDING_PROCESSOR; import org.opensearch.neuralsearch.query.NeuralQueryBuilder; @@ -34,27 +31,26 @@ public void testTextEmbeddingProcessor_E2EFlow() throws Exception { createPipelineProcessor(modelId, PIPELINE_NAME); createIndexWithConfiguration( getIndexNameForTest(), - Files.readString(Path.of(classLoader.getResource("processor/IndexMappings.json").toURI())), + Files.readString(Path.of(classLoader.getResource("processor/IndexMappingMultipleShard.json").toURI())), PIPELINE_NAME ); - addDocument(getIndexNameForTest(), "0", TEST_FIELD, TEXT); + addDocument(getIndexNameForTest(), "0", TEST_FIELD, TEXT, null, null); } else { - Map pipeline = getIngestionPipeline(PIPELINE_NAME); - assertNotNull(pipeline); - String modelId = getModelId(pipeline, TEXT_EMBEDDING_PROCESSOR); - loadModel(modelId); - addDocument(getIndexNameForTest(), "1", TEST_FIELD, TEXT_1); - validateTestIndex(modelId); - deletePipeline(PIPELINE_NAME); - deleteModel(modelId); - deleteIndex(getIndexNameForTest()); + String modelId = null; + try { + modelId = getModelId(getIngestionPipeline(PIPELINE_NAME), TEXT_EMBEDDING_PROCESSOR); + loadModel(modelId); + addDocument(getIndexNameForTest(), "1", TEST_FIELD, TEXT_1, null, null); + validateTestIndex(modelId); + } finally { + wipeOfTestResources(getIndexNameForTest(), PIPELINE_NAME, modelId, null); + } } } - private void validateTestIndex(String modelId) throws Exception { + private void validateTestIndex(final String modelId) throws Exception { int docCount = getDocCount(getIndexNameForTest()); assertEquals(2, docCount); - loadModel(modelId); NeuralQueryBuilder neuralQueryBuilder = new NeuralQueryBuilder(); neuralQueryBuilder.fieldName("passage_embedding"); neuralQueryBuilder.modelId(modelId); @@ -63,25 +59,4 @@ private void validateTestIndex(String modelId) throws Exception { Map response = search(getIndexNameForTest(), neuralQueryBuilder, 1); assertNotNull(response); } - - private String uploadTextEmbeddingModel() throws Exception { - String requestBody = Files.readString(Path.of(classLoader.getResource("processor/UploadModelRequestBody.json").toURI())); - return registerModelGroupAndGetModelId(requestBody); - } - - private String registerModelGroupAndGetModelId(String requestBody) throws Exception { - String modelGroupRegisterRequestBody = Files.readString( - Path.of(classLoader.getResource("processor/CreateModelGroupRequestBody.json").toURI()) - ); - String modelGroupId = registerModelGroup( - String.format(LOCALE, modelGroupRegisterRequestBody, "public_model_" + RandomizedTest.randomAsciiAlphanumOfLength(8)) - ); - return uploadModel(String.format(LOCALE, requestBody, modelGroupId)); - } - - protected void createPipelineProcessor(String modelId, String pipelineName) throws Exception { - String requestBody = Files.readString(Path.of(classLoader.getResource("processor/PipelineConfiguration.json").toURI())); - createPipelineProcessor(requestBody, pipelineName, modelId); - } - } diff --git a/qa/restart-upgrade/src/test/resources/processor/IndexMappings.json b/qa/restart-upgrade/src/test/resources/processor/IndexMappingMultipleShard.json similarity index 91% rename from qa/restart-upgrade/src/test/resources/processor/IndexMappings.json rename to qa/restart-upgrade/src/test/resources/processor/IndexMappingMultipleShard.json index fc4c13646..86675a4d2 100644 --- a/qa/restart-upgrade/src/test/resources/processor/IndexMappings.json +++ b/qa/restart-upgrade/src/test/resources/processor/IndexMappingMultipleShard.json @@ -2,7 +2,6 @@ "settings": { "index": { "knn": true, - "knn.algo_param.ef_search": 100, "refresh_interval": "30s", "default_pipeline": "%s" }, @@ -26,6 +25,9 @@ }, "passage_text": { "type": "text" + }, + "passage_image": { + "type": "text" } } } diff --git a/qa/restart-upgrade/src/test/resources/processor/IndexMappingSingleShard.json b/qa/restart-upgrade/src/test/resources/processor/IndexMappingSingleShard.json new file mode 100644 index 000000000..3baf17c0f --- /dev/null +++ b/qa/restart-upgrade/src/test/resources/processor/IndexMappingSingleShard.json @@ -0,0 +1,34 @@ +{ + "settings": { + "index": { + "knn": true, + "refresh_interval": "30s", + "default_pipeline": "%s" + }, + "number_of_shards": 1, + "number_of_replicas": 0 + }, + "mappings": { + "properties": { + "passage_embedding": { + "type": "knn_vector", + "dimension": 768, + "method": { + "name": "hnsw", + "space_type": "l2", + "engine": "lucene", + "parameters": { + "ef_construction": 128, + "m": 24 + } + } + }, + "passage_text": { + "type": "text" + }, + "passage_image": { + "type": "text" + } + } + } +} diff --git a/qa/restart-upgrade/src/test/resources/processor/PipelineForSparseEncodingProcessorConfiguration.json b/qa/restart-upgrade/src/test/resources/processor/PipelineForSparseEncodingProcessorConfiguration.json new file mode 100644 index 000000000..fe885a0a2 --- /dev/null +++ b/qa/restart-upgrade/src/test/resources/processor/PipelineForSparseEncodingProcessorConfiguration.json @@ -0,0 +1,13 @@ +{ + "description": "An sparse encoding ingest pipeline", + "processors": [ + { + "sparse_encoding": { + "model_id": "%s", + "field_map": { + "passage_text": "passage_embedding" + } + } + } + ] +} diff --git a/qa/restart-upgrade/src/test/resources/processor/PipelineForTextImageProcessorConfiguration.json b/qa/restart-upgrade/src/test/resources/processor/PipelineForTextImageProcessorConfiguration.json new file mode 100644 index 000000000..60d5dc051 --- /dev/null +++ b/qa/restart-upgrade/src/test/resources/processor/PipelineForTextImageProcessorConfiguration.json @@ -0,0 +1,15 @@ +{ + "description": "text image embedding pipeline", + "processors": [ + { + "text_image_embedding": { + "model_id": "%s", + "embedding": "passage_embedding", + "field_map": { + "text": "passage_text", + "image": "passage_image" + } + } + } + ] +} diff --git a/qa/restart-upgrade/src/test/resources/processor/SparseIndexMappings.json b/qa/restart-upgrade/src/test/resources/processor/SparseIndexMappings.json new file mode 100644 index 000000000..c3ec5951f --- /dev/null +++ b/qa/restart-upgrade/src/test/resources/processor/SparseIndexMappings.json @@ -0,0 +1,17 @@ +{ + "settings": { + "default_pipeline": "%s", + "number_of_shards": 3, + "number_of_replicas": 1 + }, + "mappings": { + "properties": { + "passage_embedding": { + "type": "rank_features" + }, + "passage_text": { + "type": "text" + } + } + } +} diff --git a/qa/restart-upgrade/src/test/resources/processor/UploadSparseEncodingModelRequestBody.json b/qa/restart-upgrade/src/test/resources/processor/UploadSparseEncodingModelRequestBody.json new file mode 100644 index 000000000..6ca51e50e --- /dev/null +++ b/qa/restart-upgrade/src/test/resources/processor/UploadSparseEncodingModelRequestBody.json @@ -0,0 +1,10 @@ +{ + "name": "tokenize-idf-0915", + "version": "1.0.0", + "function_name": "SPARSE_TOKENIZE", + "description": "test model", + "model_format": "TORCH_SCRIPT", + "model_group_id": "%s", + "model_content_hash_value": "b345e9e943b62c405a8dd227ef2c46c84c5ff0a0b71b584be9132b37bce91a9a", + "url": "https://github.com/opensearch-project/ml-commons/raw/main/ml-algorithms/src/test/resources/org/opensearch/ml/engine/algorithms/sparse_encoding/sparse_demo.zip" + } diff --git a/qa/rolling-upgrade/build.gradle b/qa/rolling-upgrade/build.gradle index 51f03695c..6aff5b499 100644 --- a/qa/rolling-upgrade/build.gradle +++ b/qa/rolling-upgrade/build.gradle @@ -34,6 +34,16 @@ task testAgainstOldCluster(type: StandaloneRestIntegTestTask) { systemProperty 'tests.rest.bwcsuite_cluster', 'old_cluster' systemProperty 'tests.plugin_bwc_version', neural_search_bwc_version systemProperty 'tests.skip_delete_model_index', 'true' + + //Excluding MultiModalSearchIT, HybridSearchIT, NeuralSparseSearchIT tests from neural search version 2.9 and 2.10 because these features were released in 2.11 version. + if (neural_search_bwc_version.startsWith("2.9") || neural_search_bwc_version.startsWith("2.10")){ + filter { + excludeTestsMatching "org.opensearch.neuralsearch.bwc.MultiModalSearchIT.*" + excludeTestsMatching "org.opensearch.neuralsearch.bwc.HybridSearchIT.*" + excludeTestsMatching "org.opensearch.neuralsearch.bwc.NeuralSparseSearchIT.*" + } + } + nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}".allHttpSocketURI.join(",")}") nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}".getName()}") systemProperty 'tests.security.manager', 'false' @@ -52,6 +62,16 @@ task testAgainstOneThirdUpgradedCluster(type: StandaloneRestIntegTestTask) { systemProperty 'tests.rest.first_round', 'true' systemProperty 'tests.skip_delete_model_index', 'true' systemProperty 'tests.plugin_bwc_version', neural_search_bwc_version + + //Excluding MultiModalSearchIT, HybridSearchIT, NeuralSparseSearchIT tests from neural search version 2.9 and 2.10 because these features were released in 2.11 version. + if (neural_search_bwc_version.startsWith("2.9") || neural_search_bwc_version.startsWith("2.10")){ + filter { + excludeTestsMatching "org.opensearch.neuralsearch.bwc.MultiModalSearchIT.*" + excludeTestsMatching "org.opensearch.neuralsearch.bwc.HybridSearchIT.*" + excludeTestsMatching "org.opensearch.neuralsearch.bwc.NeuralSparseSearchIT.*" + } + } + nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}".allHttpSocketURI.join(",")}") nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}".getName()}") systemProperty 'tests.security.manager', 'false' @@ -69,6 +89,16 @@ task testAgainstTwoThirdsUpgradedCluster(type: StandaloneRestIntegTestTask) { systemProperty 'tests.rest.first_round', 'false' systemProperty 'tests.skip_delete_model_index', 'true' systemProperty 'tests.plugin_bwc_version', neural_search_bwc_version + + //Excluding MultiModalSearchIT, HybridSearchIT, NeuralSparseSearchIT tests from neural search version 2.9 and 2.10 because these features were released in 2.11 version. + if (neural_search_bwc_version.startsWith("2.9") || neural_search_bwc_version.startsWith("2.10")){ + filter { + excludeTestsMatching "org.opensearch.neuralsearch.bwc.MultiModalSearchIT.*" + excludeTestsMatching "org.opensearch.neuralsearch.bwc.HybridSearchIT.*" + excludeTestsMatching "org.opensearch.neuralsearch.bwc.NeuralSparseSearchIT.*" + } + } + nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}".allHttpSocketURI.join(",")}") nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}".getName()}") systemProperty 'tests.security.manager', 'false' @@ -86,6 +116,16 @@ task testRollingUpgrade(type: StandaloneRestIntegTestTask) { systemProperty 'tests.rest.bwcsuite_cluster', 'upgraded_cluster' systemProperty 'tests.skip_delete_model_index', 'true' systemProperty 'tests.plugin_bwc_version', neural_search_bwc_version + + //Excluding MultiModalSearchIT, HybridSearchIT, NeuralSparseSearchIT tests from neural search version 2.9 and 2.10 because these features were released in 2.11 version. + if (neural_search_bwc_version.startsWith("2.9") || neural_search_bwc_version.startsWith("2.10")){ + filter { + excludeTestsMatching "org.opensearch.neuralsearch.bwc.MultiModalSearchIT.*" + excludeTestsMatching "org.opensearch.neuralsearch.bwc.HybridSearchIT.*" + excludeTestsMatching "org.opensearch.neuralsearch.bwc.NeuralSparseSearchIT.*" + } + } + nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}".allHttpSocketURI.join(",")}") nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}".getName()}") systemProperty 'tests.security.manager', 'false' diff --git a/qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/AbstractRollingUpgradeTestCase.java b/qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/AbstractRollingUpgradeTestCase.java index 98ce95b72..16ed2d229 100644 --- a/qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/AbstractRollingUpgradeTestCase.java +++ b/qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/AbstractRollingUpgradeTestCase.java @@ -4,19 +4,22 @@ */ package org.opensearch.neuralsearch.bwc; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.Locale; import java.util.Optional; import org.junit.Before; import org.opensearch.common.settings.Settings; import org.opensearch.neuralsearch.BaseNeuralSearchIT; -import org.opensearch.test.rest.OpenSearchRestTestCase; +import static org.opensearch.neuralsearch.TestUtils.NEURAL_SEARCH_BWC_PREFIX; import static org.opensearch.neuralsearch.TestUtils.OLD_CLUSTER; import static org.opensearch.neuralsearch.TestUtils.MIXED_CLUSTER; import static org.opensearch.neuralsearch.TestUtils.UPGRADED_CLUSTER; -import static org.opensearch.neuralsearch.TestUtils.BWC_VERSION; import static org.opensearch.neuralsearch.TestUtils.ROLLING_UPGRADE_FIRST_ROUND; import static org.opensearch.neuralsearch.TestUtils.BWCSUITE_CLUSTER; -import static org.opensearch.neuralsearch.TestUtils.NEURAL_SEARCH_BWC_PREFIX; +import static org.opensearch.neuralsearch.TestUtils.BWC_VERSION; +import static org.opensearch.neuralsearch.TestUtils.generateModelId; +import org.opensearch.test.rest.OpenSearchRestTestCase; public abstract class AbstractRollingUpgradeTestCase extends BaseNeuralSearchIT { @@ -84,4 +87,47 @@ protected final Optional getBWCVersion() { return Optional.ofNullable(System.getProperty(BWC_VERSION, null)); } + protected String uploadTextEmbeddingModel() throws Exception { + String requestBody = Files.readString(Path.of(classLoader.getResource("processor/UploadModelRequestBody.json").toURI())); + return registerModelGroupAndGetModelId(requestBody); + } + + protected String registerModelGroupAndGetModelId(String requestBody) throws Exception { + String modelGroupRegisterRequestBody = Files.readString( + Path.of(classLoader.getResource("processor/CreateModelGroupRequestBody.json").toURI()) + ); + String modelGroupId = registerModelGroup(String.format(LOCALE, modelGroupRegisterRequestBody, generateModelId())); + return uploadModel(String.format(LOCALE, requestBody, modelGroupId)); + } + + protected void createPipelineProcessor(String modelId, String pipelineName) throws Exception { + String requestBody = Files.readString(Path.of(classLoader.getResource("processor/PipelineConfiguration.json").toURI())); + createPipelineProcessor(requestBody, pipelineName, modelId); + } + + protected String uploadTextImageEmbeddingModel() throws Exception { + String requestBody = Files.readString(Path.of(classLoader.getResource("processor/UploadModelRequestBody.json").toURI())); + return registerModelGroupAndGetModelId(requestBody); + } + + protected void createPipelineForTextImageProcessor(String modelId, String pipelineName) throws Exception { + String requestBody = Files.readString( + Path.of(classLoader.getResource("processor/PipelineForTextImageProcessorConfiguration.json").toURI()) + ); + createPipelineProcessor(requestBody, pipelineName, modelId); + } + + protected String uploadSparseEncodingModel() throws Exception { + String requestBody = Files.readString( + Path.of(classLoader.getResource("processor/UploadSparseEncodingModelRequestBody.json").toURI()) + ); + return registerModelGroupAndGetModelId(requestBody); + } + + protected void createPipelineForSparseEncodingProcessor(String modelId, String pipelineName) throws Exception { + String requestBody = Files.readString( + Path.of(classLoader.getResource("processor/PipelineForSparseEncodingProcessorConfiguration.json").toURI()) + ); + createPipelineProcessor(requestBody, pipelineName, modelId); + } } diff --git a/qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/HybridSearchIT.java b/qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/HybridSearchIT.java new file mode 100644 index 000000000..292540820 --- /dev/null +++ b/qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/HybridSearchIT.java @@ -0,0 +1,121 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.neuralsearch.bwc; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import org.opensearch.index.query.MatchQueryBuilder; +import static org.opensearch.neuralsearch.TestUtils.NODES_BWC_CLUSTER; +import static org.opensearch.neuralsearch.TestUtils.PARAM_NAME_WEIGHTS; +import static org.opensearch.neuralsearch.TestUtils.TEXT_EMBEDDING_PROCESSOR; +import static org.opensearch.neuralsearch.TestUtils.DEFAULT_NORMALIZATION_METHOD; +import static org.opensearch.neuralsearch.TestUtils.DEFAULT_COMBINATION_METHOD; +import static org.opensearch.neuralsearch.TestUtils.getModelId; +import org.opensearch.neuralsearch.query.HybridQueryBuilder; +import org.opensearch.neuralsearch.query.NeuralQueryBuilder; + +public class HybridSearchIT extends AbstractRollingUpgradeTestCase { + + private static final String PIPELINE_NAME = "nlp-hybrid-pipeline"; + private static final String SEARCH_PIPELINE_NAME = "nlp-search-pipeline"; + private static final String TEST_FIELD = "passage_text"; + private static final String TEXT = "Hello world"; + private static final String TEXT_MIXED = "Hi planet"; + private static final String TEXT_UPGRADED = "Hi earth"; + private static final String QUERY = "Hi world"; + private static final int NUM_DOCS_PER_ROUND = 1; + private static String modelId = ""; + + // Test rolling-upgrade normalization processor when index with multiple shards + // Create Text Embedding Processor, Ingestion Pipeline, add document and search pipeline with noramlization processor + // Validate process , pipeline and document count in rolling-upgrade scenario + public void testNormalizationProcessor_whenIndexWithMultipleShards_E2EFlow() throws Exception { + waitForClusterHealthGreen(NODES_BWC_CLUSTER); + switch (getClusterType()) { + case OLD: + modelId = uploadTextEmbeddingModel(); + loadModel(modelId); + createPipelineProcessor(modelId, PIPELINE_NAME); + createIndexWithConfiguration( + getIndexNameForTest(), + Files.readString(Path.of(classLoader.getResource("processor/IndexMappings.json").toURI())), + PIPELINE_NAME + ); + addDocument(getIndexNameForTest(), "0", TEST_FIELD, TEXT, null, null); + createSearchPipeline( + SEARCH_PIPELINE_NAME, + DEFAULT_NORMALIZATION_METHOD, + DEFAULT_COMBINATION_METHOD, + Map.of(PARAM_NAME_WEIGHTS, Arrays.toString(new float[] { 0.3f, 0.7f })) + ); + break; + case MIXED: + modelId = getModelId(getIngestionPipeline(PIPELINE_NAME), TEXT_EMBEDDING_PROCESSOR); + int totalDocsCountMixed; + if (isFirstMixedRound()) { + totalDocsCountMixed = NUM_DOCS_PER_ROUND; + validateTestIndexOnUpgrade(totalDocsCountMixed, modelId); + addDocument(getIndexNameForTest(), "1", TEST_FIELD, TEXT_MIXED, null, null); + } else { + totalDocsCountMixed = 2 * NUM_DOCS_PER_ROUND; + validateTestIndexOnUpgrade(totalDocsCountMixed, modelId); + } + break; + case UPGRADED: + try { + modelId = getModelId(getIngestionPipeline(PIPELINE_NAME), TEXT_EMBEDDING_PROCESSOR); + int totalDocsCountUpgraded = 3 * NUM_DOCS_PER_ROUND; + loadModel(modelId); + addDocument(getIndexNameForTest(), "2", TEST_FIELD, TEXT_UPGRADED, null, null); + validateTestIndexOnUpgrade(totalDocsCountUpgraded, modelId); + } finally { + wipeOfTestResources(getIndexNameForTest(), PIPELINE_NAME, modelId, SEARCH_PIPELINE_NAME); + } + break; + default: + throw new IllegalStateException("Unexpected value: " + getClusterType()); + } + } + + private void validateTestIndexOnUpgrade(final int numberOfDocs, final String modelId) throws Exception { + int docCount = getDocCount(getIndexNameForTest()); + assertEquals(numberOfDocs, docCount); + loadModel(modelId); + HybridQueryBuilder hybridQueryBuilder = getQueryBuilder(modelId); + Map searchResponseAsMap = search( + getIndexNameForTest(), + hybridQueryBuilder, + null, + 1, + Map.of("search_pipeline", SEARCH_PIPELINE_NAME) + ); + assertNotNull(searchResponseAsMap); + int hits = getHitCount(searchResponseAsMap); + assertEquals(1, hits); + List scoresList = getNormalizationScoreList(searchResponseAsMap); + for (Double score : scoresList) { + assertTrue(0 <= score && score <= 2); + } + } + + private HybridQueryBuilder getQueryBuilder(final String modelId) { + NeuralQueryBuilder neuralQueryBuilder = new NeuralQueryBuilder(); + neuralQueryBuilder.fieldName("passage_embedding"); + neuralQueryBuilder.modelId(modelId); + neuralQueryBuilder.queryText(QUERY); + neuralQueryBuilder.k(5); + + MatchQueryBuilder matchQueryBuilder = new MatchQueryBuilder("text", QUERY); + + HybridQueryBuilder hybridQueryBuilder = new HybridQueryBuilder(); + hybridQueryBuilder.add(matchQueryBuilder); + hybridQueryBuilder.add(neuralQueryBuilder); + + return hybridQueryBuilder; + } +} diff --git a/qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/MultiModalSearchIT.java b/qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/MultiModalSearchIT.java new file mode 100644 index 000000000..b91ec1322 --- /dev/null +++ b/qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/MultiModalSearchIT.java @@ -0,0 +1,83 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.neuralsearch.bwc; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Map; +import static org.opensearch.neuralsearch.TestUtils.NODES_BWC_CLUSTER; +import static org.opensearch.neuralsearch.TestUtils.TEXT_IMAGE_EMBEDDING_PROCESSOR; +import static org.opensearch.neuralsearch.TestUtils.getModelId; +import org.opensearch.neuralsearch.query.NeuralQueryBuilder; + +public class MultiModalSearchIT extends AbstractRollingUpgradeTestCase { + private static final String PIPELINE_NAME = "nlp-ingest-pipeline"; + private static final String TEST_FIELD = "passage_text"; + private static final String TEST_IMAGE_FIELD = "passage_image"; + private static final String TEXT = "Hello world"; + private static final String TEXT_MIXED = "Hello world mixed"; + private static final String TEXT_UPGRADED = "Hello world upgraded"; + private static final String TEST_IMAGE_TEXT = "/9j/4AAQSkZJRgABAQAASABIAAD"; + private static final String TEST_IMAGE_TEXT_MIXED = "/9j/4AAQSkZJRgbdwoeicfhoid"; + private static final String TEST_IMAGE_TEXT_UPGRADED = "/9j/4AAQSkZJR8eydhgfwceocvlk"; + + private static final int NUM_DOCS_PER_ROUND = 1; + private static String modelId = ""; + + // Test rolling-upgrade test image embedding processor + // Create Text Image Embedding Processor, Ingestion Pipeline and add document + // Validate process , pipeline and document count in rolling-upgrade scenario + public void testTextImageEmbeddingProcessor_E2EFlow() throws Exception { + waitForClusterHealthGreen(NODES_BWC_CLUSTER); + switch (getClusterType()) { + case OLD: + modelId = uploadTextImageEmbeddingModel(); + loadModel(modelId); + createPipelineForTextImageProcessor(modelId, PIPELINE_NAME); + createIndexWithConfiguration( + getIndexNameForTest(), + Files.readString(Path.of(classLoader.getResource("processor/IndexMappings.json").toURI())), + PIPELINE_NAME + ); + addDocument(getIndexNameForTest(), "0", TEST_FIELD, TEXT, TEST_IMAGE_FIELD, TEST_IMAGE_TEXT); + break; + case MIXED: + modelId = getModelId(getIngestionPipeline(PIPELINE_NAME), TEXT_IMAGE_EMBEDDING_PROCESSOR); + int totalDocsCountMixed; + if (isFirstMixedRound()) { + totalDocsCountMixed = NUM_DOCS_PER_ROUND; + validateTestIndexOnUpgrade(totalDocsCountMixed, modelId, TEXT, TEST_IMAGE_TEXT); + addDocument(getIndexNameForTest(), "1", TEST_FIELD, TEXT_MIXED, TEST_IMAGE_FIELD, TEST_IMAGE_TEXT_MIXED); + } else { + totalDocsCountMixed = 2 * NUM_DOCS_PER_ROUND; + validateTestIndexOnUpgrade(totalDocsCountMixed, modelId, TEXT_MIXED, TEST_IMAGE_TEXT_MIXED); + } + break; + case UPGRADED: + try { + modelId = getModelId(getIngestionPipeline(PIPELINE_NAME), TEXT_IMAGE_EMBEDDING_PROCESSOR); + int totalDocsCountUpgraded = 3 * NUM_DOCS_PER_ROUND; + loadModel(modelId); + addDocument(getIndexNameForTest(), "2", TEST_FIELD, TEXT_UPGRADED, TEST_IMAGE_FIELD, TEST_IMAGE_TEXT_UPGRADED); + validateTestIndexOnUpgrade(totalDocsCountUpgraded, modelId, TEXT_UPGRADED, TEST_IMAGE_TEXT_UPGRADED); + } finally { + wipeOfTestResources(getIndexNameForTest(), PIPELINE_NAME, modelId, null); + } + break; + default: + throw new IllegalStateException("Unexpected value: " + getClusterType()); + } + } + + private void validateTestIndexOnUpgrade(final int numberOfDocs, final String modelId, final String text, final String imageText) + throws Exception { + int docCount = getDocCount(getIndexNameForTest()); + assertEquals(numberOfDocs, docCount); + loadModel(modelId); + NeuralQueryBuilder neuralQueryBuilder = new NeuralQueryBuilder("passage_embedding", text, imageText, modelId, 1, null, null); + Map response = search(getIndexNameForTest(), neuralQueryBuilder, 1); + assertNotNull(response); + } +} diff --git a/qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/NeuralSparseSearchIT.java b/qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/NeuralSparseSearchIT.java new file mode 100644 index 000000000..70513686b --- /dev/null +++ b/qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/NeuralSparseSearchIT.java @@ -0,0 +1,120 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.neuralsearch.bwc; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import org.opensearch.index.query.BoolQueryBuilder; +import org.opensearch.index.query.MatchQueryBuilder; +import org.opensearch.neuralsearch.TestUtils; +import static org.opensearch.neuralsearch.TestUtils.NODES_BWC_CLUSTER; +import static org.opensearch.neuralsearch.TestUtils.SPARSE_ENCODING_PROCESSOR; +import static org.opensearch.neuralsearch.TestUtils.objectToFloat; +import static org.opensearch.neuralsearch.TestUtils.getModelId; +import org.opensearch.neuralsearch.query.NeuralSparseQueryBuilder; + +public class NeuralSparseSearchIT extends AbstractRollingUpgradeTestCase { + private static final String PIPELINE_NAME = "nlp-ingest-pipeline-sparse"; + private static final String TEST_SPARSE_ENCODING_FIELD = "passage_embedding"; + private static final String TEST_TEXT_FIELD = "passage_text"; + private static final String TEXT = "Hello world a b"; + private static final String TEXT_MIXED = "Hello planet"; + private static final String TEXT_UPGRADED = "Hello earth"; + private static final String QUERY = "Hi world"; + private static final List TEST_TOKENS_1 = List.of("hello", "world", "a", "b", "c"); + private static final List TEST_TOKENS_2 = List.of("hello", "planet", "a", "b", "c"); + private static final List TEST_TOKENS_3 = List.of("hello", "earth", "a", "b", "c"); + private final Map testRankFeaturesDoc1 = TestUtils.createRandomTokenWeightMap(TEST_TOKENS_1); + private final Map testRankFeaturesDoc2 = TestUtils.createRandomTokenWeightMap(TEST_TOKENS_2); + private final Map testRankFeaturesDoc3 = TestUtils.createRandomTokenWeightMap(TEST_TOKENS_3); + private static final int NUM_DOCS_PER_ROUND = 1; + private static String modelId = ""; + + // Test rolling-upgrade test sparse embedding processor + // Create Sparse Encoding Processor, Ingestion Pipeline and add document + // Validate process , pipeline and document count in restart-upgrade scenario + public void testSparseEncodingProcessor_E2EFlow() throws Exception { + waitForClusterHealthGreen(NODES_BWC_CLUSTER); + switch (getClusterType()) { + case OLD: + modelId = uploadSparseEncodingModel(); + loadModel(modelId); + createPipelineForSparseEncodingProcessor(modelId, PIPELINE_NAME); + createIndexWithConfiguration( + getIndexNameForTest(), + Files.readString(Path.of(classLoader.getResource("processor/SparseIndexMappings.json").toURI())), + PIPELINE_NAME + ); + addSparseEncodingDoc( + getIndexNameForTest(), + "0", + List.of(TEST_SPARSE_ENCODING_FIELD), + List.of(testRankFeaturesDoc1), + List.of(TEST_TEXT_FIELD), + List.of(TEXT) + ); + break; + case MIXED: + modelId = getModelId(getIngestionPipeline(PIPELINE_NAME), SPARSE_ENCODING_PROCESSOR); + int totalDocsCountMixed; + if (isFirstMixedRound()) { + totalDocsCountMixed = NUM_DOCS_PER_ROUND; + validateTestIndexOnUpgrade(totalDocsCountMixed, modelId); + addSparseEncodingDoc( + getIndexNameForTest(), + "1", + List.of(TEST_SPARSE_ENCODING_FIELD), + List.of(testRankFeaturesDoc2), + List.of(TEST_TEXT_FIELD), + List.of(TEXT_MIXED) + ); + } else { + totalDocsCountMixed = 2 * NUM_DOCS_PER_ROUND; + validateTestIndexOnUpgrade(totalDocsCountMixed, modelId); + } + break; + case UPGRADED: + try { + modelId = getModelId(getIngestionPipeline(PIPELINE_NAME), SPARSE_ENCODING_PROCESSOR); + int totalDocsCountUpgraded = 3 * NUM_DOCS_PER_ROUND; + loadModel(modelId); + addSparseEncodingDoc( + getIndexNameForTest(), + "2", + List.of(TEST_SPARSE_ENCODING_FIELD), + List.of(testRankFeaturesDoc3), + List.of(TEST_TEXT_FIELD), + List.of(TEXT_UPGRADED) + ); + validateTestIndexOnUpgrade(totalDocsCountUpgraded, modelId); + } finally { + wipeOfTestResources(getIndexNameForTest(), PIPELINE_NAME, modelId, null); + } + break; + default: + throw new IllegalStateException("Unexpected value: " + getClusterType()); + } + } + + private void validateTestIndexOnUpgrade(final int numberOfDocs, final String modelId) throws Exception { + int docCount = getDocCount(getIndexNameForTest()); + assertEquals(numberOfDocs, docCount); + loadModel(modelId); + BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder(); + NeuralSparseQueryBuilder sparseEncodingQueryBuilder = new NeuralSparseQueryBuilder().fieldName(TEST_SPARSE_ENCODING_FIELD) + .queryText(TEXT) + .modelId(modelId); + MatchQueryBuilder matchQueryBuilder = new MatchQueryBuilder(TEST_TEXT_FIELD, TEXT); + boolQueryBuilder.should(sparseEncodingQueryBuilder).should(matchQueryBuilder); + Map response = search(getIndexNameForTest(), boolQueryBuilder, 1); + Map firstInnerHit = getFirstInnerHit(response); + + assertEquals("0", firstInnerHit.get("_id")); + float minExpectedScore = computeExpectedScore(modelId, testRankFeaturesDoc1, TEXT); + assertTrue(minExpectedScore < objectToFloat(firstInnerHit.get("_score"))); + } +} diff --git a/qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/SemanticSearchIT.java b/qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/SemanticSearchIT.java index 41d21aa3f..51e548474 100644 --- a/qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/SemanticSearchIT.java +++ b/qa/rolling-upgrade/src/test/java/org/opensearch/neuralsearch/bwc/SemanticSearchIT.java @@ -4,13 +4,12 @@ */ package org.opensearch.neuralsearch.bwc; -import com.carrotsearch.randomizedtesting.RandomizedTest; import java.nio.file.Files; import java.nio.file.Path; import java.util.Map; -import org.opensearch.neuralsearch.TestUtils; import static org.opensearch.neuralsearch.TestUtils.NODES_BWC_CLUSTER; import static org.opensearch.neuralsearch.TestUtils.TEXT_EMBEDDING_PROCESSOR; +import static org.opensearch.neuralsearch.TestUtils.getModelId; import org.opensearch.neuralsearch.query.NeuralQueryBuilder; public class SemanticSearchIT extends AbstractRollingUpgradeTestCase { @@ -20,15 +19,16 @@ public class SemanticSearchIT extends AbstractRollingUpgradeTestCase { private static final String TEXT_MIXED = "Hello world mixed"; private static final String TEXT_UPGRADED = "Hello world upgraded"; private static final int NUM_DOCS_PER_ROUND = 1; + private static String modelId = ""; // Test rolling-upgrade Semantic Search // Create Text Embedding Processor, Ingestion Pipeline and add document // Validate process , pipeline and document count in rolling-upgrade scenario - public void testTextEmbeddingProcessor_E2EFlow() throws Exception { + public void testSemanticSearch_E2EFlow() throws Exception { waitForClusterHealthGreen(NODES_BWC_CLUSTER); switch (getClusterType()) { case OLD: - String modelId = uploadTextEmbeddingModel(); + modelId = uploadTextEmbeddingModel(); loadModel(modelId); createPipelineProcessor(modelId, PIPELINE_NAME); createIndexWithConfiguration( @@ -36,36 +36,38 @@ public void testTextEmbeddingProcessor_E2EFlow() throws Exception { Files.readString(Path.of(classLoader.getResource("processor/IndexMappings.json").toURI())), PIPELINE_NAME ); - addDocument(getIndexNameForTest(), "0", TEST_FIELD, TEXT); + addDocument(getIndexNameForTest(), "0", TEST_FIELD, TEXT, null, null); break; case MIXED: - modelId = getModelId(PIPELINE_NAME); + modelId = getModelId(getIngestionPipeline(PIPELINE_NAME), TEXT_EMBEDDING_PROCESSOR); int totalDocsCountMixed; if (isFirstMixedRound()) { totalDocsCountMixed = NUM_DOCS_PER_ROUND; validateTestIndexOnUpgrade(totalDocsCountMixed, modelId, TEXT); - addDocument(getIndexNameForTest(), "1", TEST_FIELD, TEXT_MIXED); - + addDocument(getIndexNameForTest(), "1", TEST_FIELD, TEXT_MIXED, null, null); } else { totalDocsCountMixed = 2 * NUM_DOCS_PER_ROUND; validateTestIndexOnUpgrade(totalDocsCountMixed, modelId, TEXT_MIXED); } break; case UPGRADED: - modelId = getModelId(PIPELINE_NAME); - int totalDocsCountUpgraded = 3 * NUM_DOCS_PER_ROUND; - loadModel(modelId); - addDocument(getIndexNameForTest(), "2", TEST_FIELD, TEXT_UPGRADED); - validateTestIndexOnUpgrade(totalDocsCountUpgraded, modelId, TEXT_UPGRADED); - deletePipeline(PIPELINE_NAME); - deleteModel(modelId); - deleteIndex(getIndexNameForTest()); + try { + modelId = getModelId(getIngestionPipeline(PIPELINE_NAME), TEXT_EMBEDDING_PROCESSOR); + int totalDocsCountUpgraded = 3 * NUM_DOCS_PER_ROUND; + loadModel(modelId); + addDocument(getIndexNameForTest(), "2", TEST_FIELD, TEXT_UPGRADED, null, null); + validateTestIndexOnUpgrade(totalDocsCountUpgraded, modelId, TEXT_UPGRADED); + } finally { + wipeOfTestResources(getIndexNameForTest(), PIPELINE_NAME, modelId, null); + } break; + default: + throw new IllegalStateException("Unexpected value: " + getClusterType()); } } - private void validateTestIndexOnUpgrade(int numberOfDocs, String modelId, String text) throws Exception { + private void validateTestIndexOnUpgrade(final int numberOfDocs, final String modelId, final String text) throws Exception { int docCount = getDocCount(getIndexNameForTest()); assertEquals(numberOfDocs, docCount); loadModel(modelId); @@ -77,30 +79,4 @@ private void validateTestIndexOnUpgrade(int numberOfDocs, String modelId, String Map response = search(getIndexNameForTest(), neuralQueryBuilder, 1); assertNotNull(response); } - - private String uploadTextEmbeddingModel() throws Exception { - String requestBody = Files.readString(Path.of(classLoader.getResource("processor/UploadModelRequestBody.json").toURI())); - return registerModelGroupAndGetModelId(requestBody); - } - - private String registerModelGroupAndGetModelId(String requestBody) throws Exception { - String modelGroupRegisterRequestBody = Files.readString( - Path.of(classLoader.getResource("processor/CreateModelGroupRequestBody.json").toURI()) - ); - String modelGroupId = registerModelGroup( - String.format(LOCALE, modelGroupRegisterRequestBody, "public_model_" + RandomizedTest.randomAsciiAlphanumOfLength(8)) - ); - return uploadModel(String.format(LOCALE, requestBody, modelGroupId)); - } - - protected void createPipelineProcessor(String modelId, String pipelineName) throws Exception { - String requestBody = Files.readString(Path.of(classLoader.getResource("processor/PipelineConfiguration.json").toURI())); - createPipelineProcessor(requestBody, pipelineName, modelId); - } - - private String getModelId(String pipelineName) { - Map pipeline = getIngestionPipeline(pipelineName); - assertNotNull(pipeline); - return TestUtils.getModelId(pipeline, TEXT_EMBEDDING_PROCESSOR); - } } diff --git a/qa/rolling-upgrade/src/test/resources/processor/IndexMappings.json b/qa/rolling-upgrade/src/test/resources/processor/IndexMappings.json index fc4c13646..86675a4d2 100644 --- a/qa/rolling-upgrade/src/test/resources/processor/IndexMappings.json +++ b/qa/rolling-upgrade/src/test/resources/processor/IndexMappings.json @@ -2,7 +2,6 @@ "settings": { "index": { "knn": true, - "knn.algo_param.ef_search": 100, "refresh_interval": "30s", "default_pipeline": "%s" }, @@ -26,6 +25,9 @@ }, "passage_text": { "type": "text" + }, + "passage_image": { + "type": "text" } } } diff --git a/qa/rolling-upgrade/src/test/resources/processor/PipelineForSparseEncodingProcessorConfiguration.json b/qa/rolling-upgrade/src/test/resources/processor/PipelineForSparseEncodingProcessorConfiguration.json new file mode 100644 index 000000000..d9a358c24 --- /dev/null +++ b/qa/rolling-upgrade/src/test/resources/processor/PipelineForSparseEncodingProcessorConfiguration.json @@ -0,0 +1,13 @@ +{ + "description": "An sparse encoding ingest pipeline", + "processors": [ + { + "sparse_encoding": { + "model_id": "%s", + "field_map": { + "passage_text": "passage_embedding" + } + } + } + ] + } diff --git a/qa/rolling-upgrade/src/test/resources/processor/PipelineForTextImageProcessorConfiguration.json b/qa/rolling-upgrade/src/test/resources/processor/PipelineForTextImageProcessorConfiguration.json new file mode 100644 index 000000000..60d5dc051 --- /dev/null +++ b/qa/rolling-upgrade/src/test/resources/processor/PipelineForTextImageProcessorConfiguration.json @@ -0,0 +1,15 @@ +{ + "description": "text image embedding pipeline", + "processors": [ + { + "text_image_embedding": { + "model_id": "%s", + "embedding": "passage_embedding", + "field_map": { + "text": "passage_text", + "image": "passage_image" + } + } + } + ] +} diff --git a/qa/rolling-upgrade/src/test/resources/processor/SparseIndexMappings.json b/qa/rolling-upgrade/src/test/resources/processor/SparseIndexMappings.json new file mode 100644 index 000000000..ffe2447c9 --- /dev/null +++ b/qa/rolling-upgrade/src/test/resources/processor/SparseIndexMappings.json @@ -0,0 +1,17 @@ +{ + "settings": { + "default_pipeline": "%s", + "number_of_shards": 3, + "number_of_replicas": 1 + }, + "mappings": { + "properties": { + "passage_embedding": { + "type": "rank_features" + }, + "passage_text": { + "type": "text" + } + } + } +} diff --git a/qa/rolling-upgrade/src/test/resources/processor/UploadSparseEncodingModelRequestBody.json b/qa/rolling-upgrade/src/test/resources/processor/UploadSparseEncodingModelRequestBody.json new file mode 100644 index 000000000..f64439549 --- /dev/null +++ b/qa/rolling-upgrade/src/test/resources/processor/UploadSparseEncodingModelRequestBody.json @@ -0,0 +1,10 @@ +{ + "name": "tokenize-idf-0915", + "version": "1.0.0", + "function_name": "SPARSE_TOKENIZE", + "description": "test model", + "model_format": "TORCH_SCRIPT", + "model_group_id": "%s", + "model_content_hash_value": "b345e9e943b62c405a8dd227ef2c46c84c5ff0a0b71b584be9132b37bce91a9a", + "url": "https://github.com/opensearch-project/ml-commons/raw/main/ml-algorithms/src/test/resources/org/opensearch/ml/engine/algorithms/sparse_encoding/sparse_demo.zip" +} diff --git a/src/test/java/org/opensearch/neuralsearch/processor/NeuralQueryEnricherProcessorIT.java b/src/test/java/org/opensearch/neuralsearch/processor/NeuralQueryEnricherProcessorIT.java index b37db7c82..803f46918 100644 --- a/src/test/java/org/opensearch/neuralsearch/processor/NeuralQueryEnricherProcessorIT.java +++ b/src/test/java/org/opensearch/neuralsearch/processor/NeuralQueryEnricherProcessorIT.java @@ -51,7 +51,7 @@ public void testNeuralQueryEnricherProcessor_whenNoModelIdPassed_thenSuccess() { initializeIndexIfNotExist(); String modelId = getDeployedModelId(); createSearchRequestProcessor(modelId, search_pipeline); - createPipelineProcessor(modelId, ingest_pipeline); + createPipelineProcessor(modelId, ingest_pipeline, ProcessorType.TEXT_EMBEDDING); updateIndexSettings(index, Settings.builder().put("index.search.default_pipeline", search_pipeline)); NeuralQueryBuilder neuralQueryBuilder = new NeuralQueryBuilder(); neuralQueryBuilder.fieldName(TEST_KNN_VECTOR_FIELD_NAME_1); @@ -68,7 +68,7 @@ public void testNeuralQueryEnricherProcessor_whenHybridQueryBuilderAndNoModelIdP initializeIndexIfNotExist(); String modelId = getDeployedModelId(); createSearchRequestProcessor(modelId, search_pipeline); - createPipelineProcessor(modelId, ingest_pipeline); + createPipelineProcessor(modelId, ingest_pipeline, ProcessorType.TEXT_EMBEDDING); updateIndexSettings(index, Settings.builder().put("index.search.default_pipeline", search_pipeline)); NeuralQueryBuilder neuralQueryBuilder = new NeuralQueryBuilder(); neuralQueryBuilder.fieldName(TEST_KNN_VECTOR_FIELD_NAME_1); diff --git a/src/test/java/org/opensearch/neuralsearch/processor/ScoreCombinationIT.java b/src/test/java/org/opensearch/neuralsearch/processor/ScoreCombinationIT.java index 832f008b8..1277c0f09 100644 --- a/src/test/java/org/opensearch/neuralsearch/processor/ScoreCombinationIT.java +++ b/src/test/java/org/opensearch/neuralsearch/processor/ScoreCombinationIT.java @@ -25,6 +25,9 @@ import org.opensearch.neuralsearch.query.HybridQueryBuilder; import org.opensearch.neuralsearch.query.NeuralQueryBuilder; +import static org.opensearch.neuralsearch.TestUtils.DEFAULT_NORMALIZATION_METHOD; +import static org.opensearch.neuralsearch.TestUtils.DEFAULT_COMBINATION_METHOD; +import static org.opensearch.neuralsearch.TestUtils.PARAM_NAME_WEIGHTS; import com.google.common.primitives.Floats; import lombok.SneakyThrows; diff --git a/src/test/java/org/opensearch/neuralsearch/processor/ScoreNormalizationIT.java b/src/test/java/org/opensearch/neuralsearch/processor/ScoreNormalizationIT.java index 9910435b8..35eabb8a3 100644 --- a/src/test/java/org/opensearch/neuralsearch/processor/ScoreNormalizationIT.java +++ b/src/test/java/org/opensearch/neuralsearch/processor/ScoreNormalizationIT.java @@ -21,6 +21,9 @@ import org.opensearch.neuralsearch.query.HybridQueryBuilder; import org.opensearch.neuralsearch.query.NeuralQueryBuilder; +import static org.opensearch.neuralsearch.TestUtils.DEFAULT_NORMALIZATION_METHOD; +import static org.opensearch.neuralsearch.TestUtils.DEFAULT_COMBINATION_METHOD; +import static org.opensearch.neuralsearch.TestUtils.PARAM_NAME_WEIGHTS; import com.google.common.primitives.Floats; import lombok.SneakyThrows; diff --git a/src/test/java/org/opensearch/neuralsearch/processor/SparseEncodingProcessIT.java b/src/test/java/org/opensearch/neuralsearch/processor/SparseEncodingProcessIT.java index 416707956..226ed01b8 100644 --- a/src/test/java/org/opensearch/neuralsearch/processor/SparseEncodingProcessIT.java +++ b/src/test/java/org/opensearch/neuralsearch/processor/SparseEncodingProcessIT.java @@ -15,13 +15,13 @@ import org.opensearch.client.Response; import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.common.xcontent.XContentType; -import org.opensearch.neuralsearch.BaseSparseEncodingIT; +import org.opensearch.neuralsearch.BaseNeuralSearchIT; import com.google.common.collect.ImmutableList; import lombok.SneakyThrows; -public class SparseEncodingProcessIT extends BaseSparseEncodingIT { +public class SparseEncodingProcessIT extends BaseNeuralSearchIT { private static final String INDEX_NAME = "sparse_encoding_index"; @@ -39,7 +39,7 @@ public void tearDown() { } public void testSparseEncodingProcessor() throws Exception { - String modelId = prepareModel(); + String modelId = prepareSparseEncodingModel(); createPipelineProcessor(modelId, PIPELINE_NAME, ProcessorType.SPARSE_ENCODING); createSparseEncodingIndex(); ingestDocument(); diff --git a/src/test/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessorIT.java b/src/test/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessorIT.java index 97d1c857c..410b399ff 100644 --- a/src/test/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessorIT.java +++ b/src/test/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessorIT.java @@ -41,7 +41,7 @@ public void tearDown() { public void testTextEmbeddingProcessor() throws Exception { String modelId = uploadTextEmbeddingModel(); loadModel(modelId); - createPipelineProcessor(modelId, PIPELINE_NAME); + createPipelineProcessor(modelId, PIPELINE_NAME, ProcessorType.TEXT_EMBEDDING); createTextEmbeddingIndex(); ingestDocument(); assertEquals(1, getDocCount(INDEX_NAME)); diff --git a/src/test/java/org/opensearch/neuralsearch/query/NeuralSparseQueryIT.java b/src/test/java/org/opensearch/neuralsearch/query/NeuralSparseQueryIT.java index afabd46c9..4ca8f2186 100644 --- a/src/test/java/org/opensearch/neuralsearch/query/NeuralSparseQueryIT.java +++ b/src/test/java/org/opensearch/neuralsearch/query/NeuralSparseQueryIT.java @@ -4,6 +4,7 @@ */ package org.opensearch.neuralsearch.query; +import org.opensearch.neuralsearch.BaseNeuralSearchIT; import static org.opensearch.neuralsearch.TestUtils.objectToFloat; import java.util.List; @@ -15,12 +16,11 @@ import org.opensearch.index.query.BoolQueryBuilder; import org.opensearch.index.query.MatchAllQueryBuilder; import org.opensearch.index.query.MatchQueryBuilder; -import org.opensearch.neuralsearch.BaseSparseEncodingIT; import org.opensearch.neuralsearch.TestUtils; import lombok.SneakyThrows; -public class NeuralSparseQueryIT extends BaseSparseEncodingIT { +public class NeuralSparseQueryIT extends BaseNeuralSearchIT { private static final String TEST_BASIC_INDEX_NAME = "test-sparse-basic-index"; private static final String TEST_MULTI_NEURAL_SPARSE_FIELD_INDEX_NAME = "test-sparse-multi-field-index"; private static final String TEST_TEXT_AND_NEURAL_SPARSE_FIELD_INDEX_NAME = "test-sparse-text-and-field-index"; @@ -40,7 +40,7 @@ public class NeuralSparseQueryIT extends BaseSparseEncodingIT { public void setUp() throws Exception { super.setUp(); updateClusterSettings(); - prepareModel(); + prepareSparseEncodingModel(); } @After diff --git a/src/testFixtures/java/org/opensearch/neuralsearch/BaseNeuralSearchIT.java b/src/testFixtures/java/org/opensearch/neuralsearch/BaseNeuralSearchIT.java index 680d90b65..786d96acf 100644 --- a/src/testFixtures/java/org/opensearch/neuralsearch/BaseNeuralSearchIT.java +++ b/src/testFixtures/java/org/opensearch/neuralsearch/BaseNeuralSearchIT.java @@ -4,6 +4,7 @@ */ package org.opensearch.neuralsearch; +import org.opensearch.ml.common.model.MLModelState; import static org.opensearch.neuralsearch.common.VectorUtil.vectorAsListToArray; import java.io.IOException; @@ -17,6 +18,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.ArrayList; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -46,6 +48,7 @@ import org.opensearch.index.query.QueryBuilder; import org.opensearch.knn.index.SpaceType; import org.opensearch.neuralsearch.util.NeuralSearchClusterUtil; +import org.opensearch.neuralsearch.util.TokenWeightUtil; import org.opensearch.test.ClusterServiceUtils; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; @@ -54,6 +57,15 @@ import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope; import com.google.common.collect.ImmutableList; +import static org.opensearch.neuralsearch.TestUtils.MAX_TASK_RESULT_QUERY_TIME_IN_SECOND; +import static org.opensearch.neuralsearch.TestUtils.DEFAULT_TASK_RESULT_QUERY_INTERVAL_IN_MILLISECOND; +import static org.opensearch.neuralsearch.TestUtils.DEFAULT_USER_AGENT; +import static org.opensearch.neuralsearch.TestUtils.DEFAULT_NORMALIZATION_METHOD; +import static org.opensearch.neuralsearch.TestUtils.DEFAULT_COMBINATION_METHOD; +import static org.opensearch.neuralsearch.TestUtils.PARAM_NAME_WEIGHTS; +import static org.opensearch.neuralsearch.TestUtils.MAX_RETRY; +import static org.opensearch.neuralsearch.TestUtils.MAX_TIME_OUT_INTERVAL; + import lombok.AllArgsConstructor; import lombok.Getter; import lombok.SneakyThrows; @@ -63,14 +75,6 @@ public abstract class BaseNeuralSearchIT extends OpenSearchSecureRestTestCase { protected static final Locale LOCALE = Locale.ROOT; - private static final int MAX_TASK_RESULT_QUERY_TIME_IN_SECOND = 60 * 5; - - private static final int DEFAULT_TASK_RESULT_QUERY_INTERVAL_IN_MILLISECOND = 1000; - protected static final String DEFAULT_USER_AGENT = "Kibana"; - protected static final String DEFAULT_NORMALIZATION_METHOD = "min_max"; - protected static final String DEFAULT_COMBINATION_METHOD = "arithmetic_mean"; - protected static final String PARAM_NAME_WEIGHTS = "weights"; - protected static final Map PIPELINE_CONFIGS_BY_TYPE = Map.of( ProcessorType.TEXT_EMBEDDING, "processor/PipelineConfiguration.json", @@ -115,7 +119,7 @@ protected void updateClusterSettings() { } @SneakyThrows - protected void updateClusterSettings(String settingKey, Object value) { + protected void updateClusterSettings(final String settingKey, final Object value) { XContentBuilder builder = XContentFactory.jsonBuilder() .startObject() .startObject("persistent") @@ -134,13 +138,13 @@ protected void updateClusterSettings(String settingKey, Object value) { assertEquals(RestStatus.OK, RestStatus.fromCode(response.getStatusLine().getStatusCode())); } - protected String registerModelGroupAndUploadModel(String requestBody) throws Exception { + protected String registerModelGroupAndUploadModel(final String requestBody) throws Exception { String modelGroupId = getModelGroupId(); // model group id is dynamically generated, we need to update model update request body after group is registered return uploadModel(String.format(LOCALE, requestBody, modelGroupId)); } - protected String uploadModel(String requestBody) throws Exception { + protected String uploadModel(final String requestBody) throws Exception { Response uploadResponse = makeRequest( client(), "POST", @@ -169,7 +173,7 @@ protected String uploadModel(String requestBody) throws Exception { return modelId; } - protected void loadModel(String modelId) throws Exception { + protected void loadModel(final String modelId) throws Exception { Response uploadResponse = makeRequest( client(), "POST", @@ -208,6 +212,21 @@ protected String prepareModel() { return modelId; } + /** + * Upload default model and load into the cluster + * + * @return modelID + */ + @SneakyThrows + protected String prepareSparseEncodingModel() { + String requestBody = Files.readString( + Path.of(classLoader.getResource("processor/UploadSparseEncodingModelRequestBody.json").toURI()) + ); + String modelId = registerModelGroupAndUploadModel(requestBody); + loadModel(modelId); + return modelId; + } + /** * Execute model inference on the provided query text * @@ -217,7 +236,7 @@ protected String prepareModel() { */ @SuppressWarnings("unchecked") @SneakyThrows - protected float[] runInference(String modelId, String queryText) { + protected float[] runInference(final String modelId, final String queryText) { Response inferenceResponse = makeRequest( client(), "POST", @@ -245,7 +264,8 @@ protected float[] runInference(String modelId, String queryText) { return vectorAsListToArray(data); } - protected void createIndexWithConfiguration(String indexName, String indexConfiguration, String pipelineName) throws Exception { + protected void createIndexWithConfiguration(final String indexName, String indexConfiguration, final String pipelineName) + throws Exception { if (StringUtils.isNotBlank(pipelineName)) { indexConfiguration = String.format(LOCALE, indexConfiguration, pipelineName); } @@ -266,16 +286,13 @@ protected void createIndexWithConfiguration(String indexName, String indexConfig assertEquals(indexName, node.get("index").toString()); } - protected void createPipelineProcessor(String modelId, String pipelineName) throws Exception { - createPipelineProcessor(modelId, pipelineName, ProcessorType.TEXT_EMBEDDING); - } - - protected void createPipelineProcessor(String modelId, String pipelineName, ProcessorType processorType) throws Exception { + protected void createPipelineProcessor(final String modelId, final String pipelineName, final ProcessorType processorType) + throws Exception { String requestBody = Files.readString(Path.of(classLoader.getResource(PIPELINE_CONFIGS_BY_TYPE.get(processorType)).toURI())); createPipelineProcessor(requestBody, pipelineName, modelId); } - protected void createPipelineProcessor(String requestBody, String pipelineName, String modelId) throws Exception { + protected void createPipelineProcessor(final String requestBody, final String pipelineName, final String modelId) throws Exception { Response pipelineCreateResponse = makeRequest( client(), "PUT", @@ -292,7 +309,7 @@ protected void createPipelineProcessor(String requestBody, String pipelineName, assertEquals("true", node.get("acknowledged").toString()); } - protected void createSearchRequestProcessor(String modelId, String pipelineName) throws Exception { + protected void createSearchRequestProcessor(final String modelId, final String pipelineName) throws Exception { Response pipelineCreateResponse = makeRequest( client(), "PUT", @@ -322,7 +339,7 @@ protected void createSearchRequestProcessor(String modelId, String pipelineName) * @return number of documents indexed to that index */ @SneakyThrows - protected int getDocCount(String indexName) { + protected int getDocCount(final String indexName) { Request request = new Request("GET", "/" + indexName + "/_count"); Response response = client().performRequest(request); assertEquals(request.getEndpoint() + ": failed", RestStatus.OK, RestStatus.fromCode(response.getStatusLine().getStatusCode())); @@ -339,7 +356,7 @@ protected int getDocCount(String indexName) { * @param resultSize number of results to return in the search * @return Search results represented as a map */ - protected Map search(String index, QueryBuilder queryBuilder, int resultSize) { + protected Map search(final String index, final QueryBuilder queryBuilder, final int resultSize) { return search(index, queryBuilder, null, resultSize); } @@ -353,7 +370,12 @@ protected Map search(String index, QueryBuilder queryBuilder, in * @return Search results represented as a map */ @SneakyThrows - protected Map search(String index, QueryBuilder queryBuilder, QueryBuilder rescorer, int resultSize) { + protected Map search( + final String index, + final QueryBuilder queryBuilder, + final QueryBuilder rescorer, + final int resultSize + ) { return search(index, queryBuilder, rescorer, resultSize, Map.of()); } @@ -369,11 +391,11 @@ protected Map search(String index, QueryBuilder queryBuilder, Qu */ @SneakyThrows protected Map search( - String index, - QueryBuilder queryBuilder, - QueryBuilder rescorer, - int resultSize, - Map requestParams + final String index, + final QueryBuilder queryBuilder, + final QueryBuilder rescorer, + final int resultSize, + final Map requestParams ) { XContentBuilder builder = XContentFactory.jsonBuilder().startObject().field("query"); queryBuilder.toXContent(builder, ToXContent.EMPTY_PARAMS); @@ -409,18 +431,18 @@ protected Map search( * @param vectorFieldNames List of vectir fields to be added * @param vectors List of vectors corresponding to those fields */ - protected void addKnnDoc(String index, String docId, List vectorFieldNames, List vectors) { + protected void addKnnDoc(final String index, final String docId, final List vectorFieldNames, final List vectors) { addKnnDoc(index, docId, vectorFieldNames, vectors, Collections.emptyList(), Collections.emptyList()); } @SneakyThrows protected void addKnnDoc( - String index, - String docId, - List vectorFieldNames, - List vectors, - List textFieldNames, - List texts + final String index, + final String docId, + final List vectorFieldNames, + final List vectors, + final List textFieldNames, + final List texts ) { addKnnDoc(index, docId, vectorFieldNames, vectors, textFieldNames, texts, Collections.emptyList(), Collections.emptyList()); } @@ -439,14 +461,14 @@ protected void addKnnDoc( */ @SneakyThrows protected void addKnnDoc( - String index, - String docId, - List vectorFieldNames, - List vectors, - List textFieldNames, - List texts, - List nestedFieldNames, - List> nestedFields + final String index, + final String docId, + final List vectorFieldNames, + final List vectors, + final List textFieldNames, + final List texts, + final List nestedFieldNames, + final List> nestedFields ) { Request request = new Request("POST", "/" + index + "/_doc/" + docId + "?refresh=true"); XContentBuilder builder = XContentFactory.jsonBuilder().startObject(); @@ -474,6 +496,41 @@ protected void addKnnDoc( assertEquals(request.getEndpoint() + ": failed", RestStatus.CREATED, RestStatus.fromCode(response.getStatusLine().getStatusCode())); } + @SneakyThrows + protected void addSparseEncodingDoc( + final String index, + final String docId, + final List fieldNames, + final List> docs + ) { + addSparseEncodingDoc(index, docId, fieldNames, docs, Collections.emptyList(), Collections.emptyList()); + } + + @SneakyThrows + protected void addSparseEncodingDoc( + final String index, + final String docId, + final List fieldNames, + final List> docs, + final List textFieldNames, + final List texts + ) { + Request request = new Request("POST", "/" + index + "/_doc/" + docId + "?refresh=true"); + XContentBuilder builder = XContentFactory.jsonBuilder().startObject(); + for (int i = 0; i < fieldNames.size(); i++) { + builder.field(fieldNames.get(i), docs.get(i)); + } + + for (int i = 0; i < textFieldNames.size(); i++) { + builder.field(textFieldNames.get(i), texts.get(i)); + } + builder.endObject(); + + request.setJsonEntity(builder.toString()); + Response response = client().performRequest(request); + assertEquals(request.getEndpoint() + ": failed", RestStatus.CREATED, RestStatus.fromCode(response.getStatusLine().getStatusCode())); + } + /** * Parse the first returned hit from a search response as a map * @@ -481,7 +538,7 @@ protected void addKnnDoc( * @return Map of first internal hit from the search */ @SuppressWarnings("unchecked") - protected Map getFirstInnerHit(Map searchResponseAsMap) { + protected Map getFirstInnerHit(final Map searchResponseAsMap) { Map hits1map = (Map) searchResponseAsMap.get("hits"); List hits2List = (List) hits1map.get("hits"); assertTrue(hits2List.size() > 0); @@ -495,12 +552,30 @@ protected Map getFirstInnerHit(Map searchRespons * @return number of hits from the search */ @SuppressWarnings("unchecked") - protected int getHitCount(Map searchResponseAsMap) { + protected int getHitCount(final Map searchResponseAsMap) { Map hits1map = (Map) searchResponseAsMap.get("hits"); List hits1List = (List) hits1map.get("hits"); return hits1List.size(); } + /** + * Parse the total number of hits and retrive score from the search + * + * @param searchResponseAsMap Complete search response as a map + * @return number of scores list from the search + */ + @SuppressWarnings("unchecked") + protected List getNormalizationScoreList(final Map searchResponseAsMap) { + Map hits1map = (Map) searchResponseAsMap.get("hits"); + List hitsList = (List) hits1map.get("hits"); + List scores = new ArrayList<>(); + for (Object hit : hitsList) { + Map searchHit = (Map) hit; + scores.add((Double) searchHit.get("_score")); + } + return scores; + } + /** * Create a k-NN index from a list of KNNFieldConfigs * @@ -508,15 +583,28 @@ protected int getHitCount(Map searchResponseAsMap) { * @param knnFieldConfigs list of configs specifying field */ @SneakyThrows - protected void prepareKnnIndex(String indexName, List knnFieldConfigs) { + protected void prepareKnnIndex(final String indexName, final List knnFieldConfigs) { prepareKnnIndex(indexName, knnFieldConfigs, 3); } @SneakyThrows - protected void prepareKnnIndex(String indexName, List knnFieldConfigs, int numOfShards) { + protected void prepareKnnIndex(final String indexName, final List knnFieldConfigs, final int numOfShards) { createIndexWithConfiguration(indexName, buildIndexConfiguration(knnFieldConfigs, numOfShards), ""); } + @SneakyThrows + protected void prepareSparseEncodingIndex(final String indexName, final List sparseEncodingFieldNames) { + XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().startObject().startObject("mappings").startObject("properties"); + + for (String fieldName : sparseEncodingFieldNames) { + xContentBuilder.startObject(fieldName).field("type", "rank_features").endObject(); + } + + xContentBuilder.endObject().endObject().endObject(); + String indexMappings = xContentBuilder.toString(); + createIndexWithConfiguration(indexName, indexMappings, ""); + } + /** * Computes the expected distance between an indexVector and query text without using the neural query type. * @@ -526,12 +614,17 @@ protected void prepareKnnIndex(String indexName, List knnFieldCo * @param queryText Text to produce query vector from * @return Expected OpenSearch score for this indexVector */ - protected float computeExpectedScore(String modelId, float[] indexVector, SpaceType spaceType, String queryText) { + protected float computeExpectedScore( + final String modelId, + final float[] indexVector, + final SpaceType spaceType, + final String queryText + ) { float[] queryVector = runInference(modelId, queryText); return spaceType.getVectorSimilarityFunction().compare(queryVector, indexVector); } - protected Map getTaskQueryResponse(String taskId) throws Exception { + protected Map getTaskQueryResponse(final String taskId) throws Exception { Response taskQueryResponse = makeRequest( client(), "GET", @@ -543,7 +636,7 @@ protected Map getTaskQueryResponse(String taskId) throws Excepti return XContentHelper.convertToMap(XContentType.JSON.xContent(), EntityUtils.toString(taskQueryResponse.getEntity()), false); } - protected boolean checkComplete(Map node) { + protected boolean checkComplete(final Map node) { Predicate> predicate = x -> node.get("error") != null || "COMPLETED".equals(String.valueOf(node.get("state"))); return predicate.test(node); } @@ -650,8 +743,10 @@ protected void deleteModel(String modelId) { ImmutableList.of(new BasicHeader(HttpHeaders.USER_AGENT, DEFAULT_USER_AGENT)) ); - // after model undeploy returns, the max interval to update model status is 3s in ml-commons CronJob. - Thread.sleep(3000); + // wait for model undeploy to complete. + // Sometimes the undeploy action results in a DEPLOY_FAILED state. But this does not block the model from being deleted. + // So set both UNDEPLOYED and DEPLOY_FAILED as exit state. + pollForModelState(modelId, Set.of(MLModelState.UNDEPLOYED, MLModelState.DEPLOY_FAILED)); makeRequest( client(), @@ -663,6 +758,46 @@ protected void deleteModel(String modelId) { ); } + protected void pollForModelState(String modelId, Set exitModelStates) throws InterruptedException { + MLModelState currentState = null; + for (int i = 0; i < MAX_RETRY; i++) { + Thread.sleep(MAX_TIME_OUT_INTERVAL); + currentState = getModelState(modelId); + if (exitModelStates.contains(currentState)) { + return; + } + } + fail( + String.format( + LOCALE, + "Model state does not reached exit states %s after %d attempts with interval of %d ms, latest model state: %s.", + StringUtils.join(exitModelStates, ","), + MAX_RETRY, + MAX_TIME_OUT_INTERVAL, + currentState + ) + ); + } + + @SneakyThrows + protected MLModelState getModelState(String modelId) { + Response getModelResponse = makeRequest( + client(), + "GET", + String.format(LOCALE, "/_plugins/_ml/models/%s", modelId), + null, + toHttpEntity(""), + ImmutableList.of(new BasicHeader(HttpHeaders.USER_AGENT, DEFAULT_USER_AGENT)) + ); + Map getModelResponseJson = XContentHelper.convertToMap( + XContentType.JSON.xContent(), + EntityUtils.toString(getModelResponse.getEntity()), + false + ); + String modelState = (String) getModelResponseJson.get("model_state"); + return MLModelState.valueOf(modelState); + } + public boolean isUpdateClusterSettings() { return true; } @@ -773,7 +908,7 @@ protected Set findDeployedModels() { List> innerHitsMap = (List>) hits.get("hits"); return innerHitsMap.stream() .map(hit -> (Map) hit.get("_source")) - .filter(hitsMap -> !Objects.isNull(hitsMap) && hitsMap.containsKey("model_id")) + .filter(hitsMap -> Objects.nonNull(hitsMap) && hitsMap.containsKey("model_id")) .map(hitsMap -> (String) hitsMap.get("model_id")) .collect(Collectors.toSet()); } @@ -799,7 +934,7 @@ private String getModelGroupId() { ); } - protected String registerModelGroup(String modelGroupRegisterRequestBody) throws IOException, ParseException { + protected String registerModelGroup(final String modelGroupRegisterRequestBody) throws IOException, ParseException { Response modelGroupResponse = makeRequest( client(), "POST", @@ -819,7 +954,7 @@ protected String registerModelGroup(String modelGroupRegisterRequestBody) throws } // Method that waits till the health of nodes in the cluster goes green - protected void waitForClusterHealthGreen(String numOfNodes) throws IOException { + protected void waitForClusterHealthGreen(final String numOfNodes) throws IOException { Request waitForGreen = new Request("GET", "/_cluster/health"); waitForGreen.addParameter("wait_for_nodes", numOfNodes); waitForGreen.addParameter("wait_for_status", "green"); @@ -833,11 +968,26 @@ protected void waitForClusterHealthGreen(String numOfNodes) throws IOException { * @param docId * @param fieldName name of the field * @param text to be added + * @param imagefieldName name of the image field + * @param imageText name of the image text + * */ - protected void addDocument(String index, String docId, String fieldName, String text) throws IOException { + protected void addDocument( + final String index, + final String docId, + final String fieldName, + final String text, + final String imagefieldName, + final String imageText + ) throws IOException { Request request = new Request("PUT", "/" + index + "/_doc/" + docId + "?refresh=true"); - XContentBuilder builder = XContentFactory.jsonBuilder().startObject().field(fieldName, text).endObject(); + XContentBuilder builder = XContentFactory.jsonBuilder().startObject(); + builder.field(fieldName, text); + if (imagefieldName != null && imageText != null) { + builder.field(imagefieldName, imageText); + } + builder.endObject(); request.setJsonEntity(builder.toString()); client().performRequest(request); } @@ -849,7 +999,7 @@ protected void addDocument(String index, String docId, String fieldName, String * @return get pipeline response as a map object */ @SneakyThrows - protected Map getIngestionPipeline(String pipelineName) { + protected Map getIngestionPipeline(final String pipelineName) { Request request = new Request("GET", "/_ingest/pipeline/" + pipelineName); Response response = client().performRequest(request); assertEquals(request.getEndpoint() + ": failed", RestStatus.OK, RestStatus.fromCode(response.getStatusLine().getStatusCode())); @@ -866,7 +1016,7 @@ protected Map getIngestionPipeline(String pipelineName) { * @return delete pipeline response as a map object */ @SneakyThrows - protected Map deletePipeline(String pipelineName) { + protected Map deletePipeline(final String pipelineName) { Request request = new Request("DELETE", "/_ingest/pipeline/" + pipelineName); Response response = client().performRequest(request); assertEquals(request.getEndpoint() + ": failed", RestStatus.OK, RestStatus.fromCode(response.getStatusLine().getStatusCode())); @@ -875,6 +1025,81 @@ protected Map deletePipeline(String pipelineName) { return responseMap; } + protected float computeExpectedScore(final String modelId, final Map tokenWeightMap, final String queryText) { + Map queryTokens = runSparseModelInference(modelId, queryText); + return computeExpectedScore(tokenWeightMap, queryTokens); + } + + protected float computeExpectedScore(final Map tokenWeightMap, final Map queryTokens) { + Float score = 0f; + for (Map.Entry entry : queryTokens.entrySet()) { + if (tokenWeightMap.containsKey(entry.getKey())) { + score += entry.getValue() * getFeatureFieldCompressedNumber(tokenWeightMap.get(entry.getKey())); + } + } + return score; + } + + @SneakyThrows + protected Map runSparseModelInference(final String modelId, final String queryText) { + Response inferenceResponse = makeRequest( + client(), + "POST", + String.format(LOCALE, "/_plugins/_ml/models/%s/_predict", modelId), + null, + toHttpEntity(String.format(LOCALE, "{\"text_docs\": [\"%s\"]}", queryText)), + ImmutableList.of(new BasicHeader(HttpHeaders.USER_AGENT, DEFAULT_USER_AGENT)) + ); + + Map inferenceResJson = XContentHelper.convertToMap( + XContentType.JSON.xContent(), + EntityUtils.toString(inferenceResponse.getEntity()), + false + ); + + Object inference_results = inferenceResJson.get("inference_results"); + assertTrue(inference_results instanceof List); + List inferenceResultsAsMap = (List) inference_results; + assertEquals(1, inferenceResultsAsMap.size()); + Map result = (Map) inferenceResultsAsMap.get(0); + List output = (List) result.get("output"); + assertEquals(1, output.size()); + Map map = (Map) output.get(0); + assertEquals(1, map.size()); + Map dataAsMap = (Map) map.get("dataAsMap"); + return TokenWeightUtil.fetchListOfTokenWeightMap(List.of(dataAsMap)).get(0); + } + + // rank_features use lucene FeatureField, which will compress the Float number to 16 bit + // this function simulate the encoding and decoding progress in lucene FeatureField + protected Float getFeatureFieldCompressedNumber(final Float originNumber) { + int freqBits = Float.floatToIntBits(originNumber); + freqBits = freqBits >> 15; + freqBits = ((int) ((float) freqBits)) << 15; + return Float.intBitsToFloat(freqBits); + } + + // Wipe of all the resources after execution of the tests. + protected void wipeOfTestResources( + final String indexName, + final String ingestPipeline, + final String modelId, + final String searchPipeline + ) throws IOException { + if (ingestPipeline != null) { + deletePipeline(ingestPipeline); + } + if (searchPipeline != null) { + deleteSearchPipeline(searchPipeline); + } + if (modelId != null) { + deleteModel(modelId); + } + if (indexName != null) { + deleteIndex(indexName); + } + } + /** * Enumeration for types of pipeline processors, used to lookup resources like create * processor request as those are type specific diff --git a/src/testFixtures/java/org/opensearch/neuralsearch/BaseSparseEncodingIT.java b/src/testFixtures/java/org/opensearch/neuralsearch/BaseSparseEncodingIT.java deleted file mode 100644 index 3072bc767..000000000 --- a/src/testFixtures/java/org/opensearch/neuralsearch/BaseSparseEncodingIT.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ -package org.opensearch.neuralsearch; - -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Collections; -import java.util.List; -import java.util.Map; - -import org.apache.hc.core5.http.HttpHeaders; -import org.apache.hc.core5.http.io.entity.EntityUtils; -import org.apache.hc.core5.http.message.BasicHeader; -import org.opensearch.client.Request; -import org.opensearch.client.Response; -import org.opensearch.common.xcontent.XContentFactory; -import org.opensearch.common.xcontent.XContentHelper; -import org.opensearch.common.xcontent.XContentType; -import org.opensearch.core.rest.RestStatus; -import org.opensearch.core.xcontent.XContentBuilder; -import org.opensearch.neuralsearch.util.TokenWeightUtil; - -import com.google.common.collect.ImmutableList; - -import lombok.SneakyThrows; - -public abstract class BaseSparseEncodingIT extends BaseNeuralSearchIT { - - @SneakyThrows - @Override - protected String prepareModel() { - String requestBody = Files.readString( - Path.of(classLoader.getResource("processor/UploadSparseEncodingModelRequestBody.json").toURI()) - ); - String modelId = registerModelGroupAndUploadModel(requestBody); - loadModel(modelId); - return modelId; - } - - @SneakyThrows - protected void prepareSparseEncodingIndex(String indexName, List sparseEncodingFieldNames) { - XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().startObject().startObject("mappings").startObject("properties"); - - for (String fieldName : sparseEncodingFieldNames) { - xContentBuilder.startObject(fieldName).field("type", "rank_features").endObject(); - } - - xContentBuilder.endObject().endObject().endObject(); - String indexMappings = xContentBuilder.toString(); - createIndexWithConfiguration(indexName, indexMappings, ""); - } - - @SneakyThrows - protected void addSparseEncodingDoc(String index, String docId, List fieldNames, List> docs) { - addSparseEncodingDoc(index, docId, fieldNames, docs, Collections.emptyList(), Collections.emptyList()); - } - - @SneakyThrows - protected void addSparseEncodingDoc( - String index, - String docId, - List fieldNames, - List> docs, - List textFieldNames, - List texts - ) { - Request request = new Request("POST", "/" + index + "/_doc/" + docId + "?refresh=true"); - XContentBuilder builder = XContentFactory.jsonBuilder().startObject(); - for (int i = 0; i < fieldNames.size(); i++) { - builder.field(fieldNames.get(i), docs.get(i)); - } - - for (int i = 0; i < textFieldNames.size(); i++) { - builder.field(textFieldNames.get(i), texts.get(i)); - } - builder.endObject(); - - request.setJsonEntity(builder.toString()); - Response response = client().performRequest(request); - assertEquals(request.getEndpoint() + ": failed", RestStatus.CREATED, RestStatus.fromCode(response.getStatusLine().getStatusCode())); - } - - protected float computeExpectedScore(String modelId, Map tokenWeightMap, String queryText) { - Map queryTokens = runSparseModelInference(modelId, queryText); - return computeExpectedScore(tokenWeightMap, queryTokens); - } - - protected float computeExpectedScore(Map tokenWeightMap, Map queryTokens) { - Float score = 0f; - for (Map.Entry entry : queryTokens.entrySet()) { - if (tokenWeightMap.containsKey(entry.getKey())) { - score += entry.getValue() * getFeatureFieldCompressedNumber(tokenWeightMap.get(entry.getKey())); - } - } - return score; - } - - @SneakyThrows - protected Map runSparseModelInference(String modelId, String queryText) { - Response inferenceResponse = makeRequest( - client(), - "POST", - String.format(LOCALE, "/_plugins/_ml/models/%s/_predict", modelId), - null, - toHttpEntity(String.format(LOCALE, "{\"text_docs\": [\"%s\"]}", queryText)), - ImmutableList.of(new BasicHeader(HttpHeaders.USER_AGENT, DEFAULT_USER_AGENT)) - ); - - Map inferenceResJson = XContentHelper.convertToMap( - XContentType.JSON.xContent(), - EntityUtils.toString(inferenceResponse.getEntity()), - false - ); - - Object inference_results = inferenceResJson.get("inference_results"); - assertTrue(inference_results instanceof List); - List inferenceResultsAsMap = (List) inference_results; - assertEquals(1, inferenceResultsAsMap.size()); - Map result = (Map) inferenceResultsAsMap.get(0); - List output = (List) result.get("output"); - assertEquals(1, output.size()); - Map map = (Map) output.get(0); - assertEquals(1, map.size()); - Map dataAsMap = (Map) map.get("dataAsMap"); - return TokenWeightUtil.fetchListOfTokenWeightMap(List.of(dataAsMap)).get(0); - } - - // rank_features use lucene FeatureField, which will compress the Float number to 16 bit - // this function simulate the encoding and decoding progress in lucene FeatureField - protected Float getFeatureFieldCompressedNumber(Float originNumber) { - int freqBits = Float.floatToIntBits(originNumber); - freqBits = freqBits >> 15; - freqBits = ((int) ((float) freqBits)) << 15; - return Float.intBitsToFloat(freqBits); - } -} diff --git a/src/testFixtures/java/org/opensearch/neuralsearch/TestUtils.java b/src/testFixtures/java/org/opensearch/neuralsearch/TestUtils.java index 99e396395..a6f4a3e0f 100644 --- a/src/testFixtures/java/org/opensearch/neuralsearch/TestUtils.java +++ b/src/testFixtures/java/org/opensearch/neuralsearch/TestUtils.java @@ -4,6 +4,7 @@ */ package org.opensearch.neuralsearch; +import com.carrotsearch.randomizedtesting.RandomizedTest; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -53,6 +54,16 @@ public class TestUtils { public static final String SECURITY_AUDITLOG_PREFIX = "security-auditlog"; public static final String OPENSEARCH_SYSTEM_INDEX_PREFIX = ".opensearch"; public static final String TEXT_EMBEDDING_PROCESSOR = "text_embedding"; + public static final String TEXT_IMAGE_EMBEDDING_PROCESSOR = "text_image_embedding"; + public static final int MAX_TASK_RESULT_QUERY_TIME_IN_SECOND = 60 * 5; + public static final int DEFAULT_TASK_RESULT_QUERY_INTERVAL_IN_MILLISECOND = 1000; + public static final String DEFAULT_USER_AGENT = "Kibana"; + public static final String DEFAULT_NORMALIZATION_METHOD = "min_max"; + public static final String DEFAULT_COMBINATION_METHOD = "arithmetic_mean"; + public static final String PARAM_NAME_WEIGHTS = "weights"; + public static final String SPARSE_ENCODING_PROCESSOR = "sparse_encoding"; + public static final int MAX_TIME_OUT_INTERVAL = 3000; + public static final int MAX_RETRY = 5; /** * Convert an xContentBuilder to a map @@ -304,10 +315,16 @@ private static Optional getMaxScore(Map searchResponseAsM } public static String getModelId(Map pipeline, String processor) { + assertNotNull(pipeline); ArrayList> processors = (ArrayList>) pipeline.get("processors"); - Map textEmbeddingProcessor = (Map) processors.get(0).get(processor); + String modelId = (String) textEmbeddingProcessor.get("model_id"); + assertNotNull(modelId); + return modelId; + } - return (String) textEmbeddingProcessor.get("model_id"); + public static String generateModelId() { + return "public_model_" + RandomizedTest.randomAsciiAlphanumOfLength(8); } + }