From ed81ada14ce458418d5f38169b7fc57e0f11eb31 Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Wed, 7 Aug 2024 16:19:42 -0700 Subject: [PATCH] Enhance syntax for nested mapping in destination fields (#841) (#842) * Enhance syntax for nested mapping in destination fields Signed-off-by: Martin Gaievski (cherry picked from commit 770a8caa05ea7e6956226996c0f2f5f403236f17) Signed-off-by: Martin Gaievski Co-authored-by: Martin Gaievski --- .../processor/InferenceProcessor.java | 52 +++++--- .../processor/TextEmbeddingProcessorIT.java | 61 ++++++--- .../TextEmbeddingProcessorTests.java | 123 ++++++++++++++++++ .../resources/processor/IndexMappings.json | 27 ++-- ...eConfigurationWithNestedFieldsMapping.json | 2 +- src/test/resources/processor/ingest_doc3.json | 5 +- src/test/resources/processor/ingest_doc4.json | 20 +++ 7 files changed, 241 insertions(+), 49 deletions(-) create mode 100644 src/test/resources/processor/ingest_doc4.json diff --git a/src/main/java/org/opensearch/neuralsearch/processor/InferenceProcessor.java b/src/main/java/org/opensearch/neuralsearch/processor/InferenceProcessor.java index 97f2f1837..30780a3f5 100644 --- a/src/main/java/org/opensearch/neuralsearch/processor/InferenceProcessor.java +++ b/src/main/java/org/opensearch/neuralsearch/processor/InferenceProcessor.java @@ -15,6 +15,7 @@ import java.util.Map; import java.util.Objects; import java.util.function.BiConsumer; +import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -29,6 +30,7 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.env.Environment; import org.opensearch.index.mapper.IndexFieldMapper; + import org.opensearch.ingest.AbstractBatchingProcessor; import org.opensearch.ingest.IngestDocument; import org.opensearch.ingest.IngestDocumentWrapper; @@ -50,6 +52,17 @@ public abstract class InferenceProcessor extends AbstractBatchingProcessor { public static final String MODEL_ID_FIELD = "model_id"; public static final String FIELD_MAP_FIELD = "field_map"; + private static final BiFunction REMAPPING_FUNCTION = (v1, v2) -> { + if (v1 instanceof Collection && v2 instanceof Collection) { + ((Collection) v1).addAll((Collection) v2); + return v1; + } else if (v1 instanceof Map && v2 instanceof Map) { + ((Map) v1).putAll((Map) v2); + return v1; + } else { + return v2; + } + }; private final String type; @@ -325,17 +338,7 @@ void buildNestedMap(String parentKey, Object processorKey, Map s buildNestedMap(nestedFieldMapEntry.getKey(), nestedFieldMapEntry.getValue(), map, next); } } - treeRes.merge(parentKey, next, (v1, v2) -> { - if (v1 instanceof Collection && v2 instanceof Collection) { - ((Collection) v1).addAll((Collection) v2); - return v1; - } else if (v1 instanceof Map && v2 instanceof Map) { - ((Map) v1).putAll((Map) v2); - return v1; - } else { - return v2; - } - }); + treeRes.merge(parentKey, next, REMAPPING_FUNCTION); } else { String key = String.valueOf(processorKey); treeRes.put(key, sourceAndMetadataMap.get(parentKey)); @@ -389,8 +392,9 @@ Map buildNLPResult(Map processorMap, List res IndexWrapper indexWrapper = new IndexWrapper(0); Map result = new LinkedHashMap<>(); for (Map.Entry knnMapEntry : processorMap.entrySet()) { - String knnKey = knnMapEntry.getKey(); - Object sourceValue = knnMapEntry.getValue(); + Pair processedNestedKey = processNestedKey(knnMapEntry); + String knnKey = processedNestedKey.getKey(); + Object sourceValue = processedNestedKey.getValue(); if (sourceValue instanceof String) { result.put(knnKey, results.get(indexWrapper.index++)); } else if (sourceValue instanceof List) { @@ -419,19 +423,31 @@ private void putNLPResultToSourceMapForMapType( nestedElement.put(inputNestedMapEntry.getKey(), results.get(indexWrapper.index++)); } } else { + Pair processedNestedKey = processNestedKey(inputNestedMapEntry); + Map sourceMap; + if (sourceAndMetadataMap.get(processorKey) == null) { + sourceMap = new HashMap<>(); + sourceAndMetadataMap.put(processorKey, sourceMap); + } else { + sourceMap = (Map) sourceAndMetadataMap.get(processorKey); + } putNLPResultToSourceMapForMapType( - inputNestedMapEntry.getKey(), - inputNestedMapEntry.getValue(), + processedNestedKey.getKey(), + processedNestedKey.getValue(), results, indexWrapper, - (Map) sourceAndMetadataMap.get(processorKey) + sourceMap ); } } } else if (sourceValue instanceof String) { - sourceAndMetadataMap.put(processorKey, results.get(indexWrapper.index++)); + sourceAndMetadataMap.merge(processorKey, results.get(indexWrapper.index++), REMAPPING_FUNCTION); } else if (sourceValue instanceof List) { - sourceAndMetadataMap.put(processorKey, buildNLPResultForListType((List) sourceValue, results, indexWrapper)); + sourceAndMetadataMap.merge( + processorKey, + buildNLPResultForListType((List) sourceValue, results, indexWrapper), + REMAPPING_FUNCTION + ); } } diff --git a/src/test/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessorIT.java b/src/test/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessorIT.java index b8415e4d6..37869c1a3 100644 --- a/src/test/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessorIT.java +++ b/src/test/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessorIT.java @@ -14,6 +14,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import org.apache.http.HttpHeaders; @@ -41,10 +42,15 @@ public class TextEmbeddingProcessorIT extends BaseNeuralSearchIT { protected static final String LEVEL_1_FIELD = "nested_passages"; protected static final String LEVEL_2_FIELD = "level_2"; protected static final String LEVEL_3_FIELD_TEXT = "level_3_text"; + protected static final String LEVEL_3_FIELD_CONTAINER = "level_3_container"; protected static final String LEVEL_3_FIELD_EMBEDDING = "level_3_embedding"; + protected static final String TEXT_FIELD_VALUE_1 = "hello"; + protected static final String TEXT_FIELD_VALUE_2 = "clown"; + protected static final String TEXT_FIELD_VALUE_3 = "abc"; private final String INGEST_DOC1 = Files.readString(Path.of(classLoader.getResource("processor/ingest_doc1.json").toURI())); private final String INGEST_DOC2 = Files.readString(Path.of(classLoader.getResource("processor/ingest_doc2.json").toURI())); private final String INGEST_DOC3 = Files.readString(Path.of(classLoader.getResource("processor/ingest_doc3.json").toURI())); + private final String INGEST_DOC4 = Files.readString(Path.of(classLoader.getResource("processor/ingest_doc4.json").toURI())); private final String BULK_ITEM_TEMPLATE = Files.readString( Path.of(classLoader.getResource("processor/bulk_item_template.json").toURI()) ); @@ -99,23 +105,17 @@ public void testNestedFieldMapping_whenDocumentsIngested_thenSuccessful() throws createPipelineProcessor(modelId, PIPELINE_NAME, ProcessorType.TEXT_EMBEDDING_WITH_NESTED_FIELDS_MAPPING); createTextEmbeddingIndex(); ingestDocument(INGEST_DOC3, "3"); + ingestDocument(INGEST_DOC4, "4"); - Map sourceMap = (Map) getDocById(INDEX_NAME, "3").get("_source"); - assertNotNull(sourceMap); - assertTrue(sourceMap.containsKey(LEVEL_1_FIELD)); - Map nestedPassages = (Map) sourceMap.get(LEVEL_1_FIELD); - assertTrue(nestedPassages.containsKey(LEVEL_2_FIELD)); - Map level2 = (Map) nestedPassages.get(LEVEL_2_FIELD); - assertEquals(QUERY_TEXT, level2.get(LEVEL_3_FIELD_TEXT)); - assertTrue(level2.containsKey(LEVEL_3_FIELD_EMBEDDING)); - List embeddings = (List) level2.get(LEVEL_3_FIELD_EMBEDDING); - assertEquals(768, embeddings.size()); - for (Double embedding : embeddings) { - assertTrue(embedding >= 0.0 && embedding <= 1.0); - } + assertDoc( + (Map) getDocById(INDEX_NAME, "3").get("_source"), + TEXT_FIELD_VALUE_1, + Optional.of(TEXT_FIELD_VALUE_3) + ); + assertDoc((Map) getDocById(INDEX_NAME, "4").get("_source"), TEXT_FIELD_VALUE_2, Optional.empty()); NeuralQueryBuilder neuralQueryBuilderQuery = new NeuralQueryBuilder( - LEVEL_1_FIELD + "." + LEVEL_2_FIELD + "." + LEVEL_3_FIELD_EMBEDDING, + LEVEL_1_FIELD + "." + LEVEL_2_FIELD + "." + LEVEL_3_FIELD_CONTAINER + "." + LEVEL_3_FIELD_EMBEDDING, QUERY_TEXT, "", modelId, @@ -133,7 +133,7 @@ public void testNestedFieldMapping_whenDocumentsIngested_thenSuccessful() throws ); QueryBuilder queryNestedHighLevel = QueryBuilders.nestedQuery(LEVEL_1_FIELD, queryNestedLowerLevel, ScoreMode.Total); - Map searchResponseAsMap = search(INDEX_NAME, queryNestedHighLevel, 1); + Map searchResponseAsMap = search(INDEX_NAME, queryNestedHighLevel, 2); assertNotNull(searchResponseAsMap); Map hits = (Map) searchResponseAsMap.get("hits"); @@ -142,15 +142,38 @@ public void testNestedFieldMapping_whenDocumentsIngested_thenSuccessful() throws assertEquals(1.0, hits.get("max_score")); List> listOfHits = (List>) hits.get("hits"); assertNotNull(listOfHits); - assertEquals(1, listOfHits.size()); - Map hitsInner = listOfHits.get(0); - assertEquals("3", hitsInner.get("_id")); - assertEquals(1.0, hitsInner.get("_score")); + assertEquals(2, listOfHits.size()); + + Map innerHitDetails = listOfHits.get(0); + assertEquals("3", innerHitDetails.get("_id")); + assertEquals(1.0, innerHitDetails.get("_score")); + + innerHitDetails = listOfHits.get(1); + assertEquals("4", innerHitDetails.get("_id")); + assertTrue((double) innerHitDetails.get("_score") <= 1.0); } finally { wipeOfTestResources(INDEX_NAME, PIPELINE_NAME, modelId, null); } } + private void assertDoc(Map sourceMap, String textFieldValue, Optional level3ExpectedValue) { + assertNotNull(sourceMap); + assertTrue(sourceMap.containsKey(LEVEL_1_FIELD)); + Map nestedPassages = (Map) sourceMap.get(LEVEL_1_FIELD); + assertTrue(nestedPassages.containsKey(LEVEL_2_FIELD)); + Map level2 = (Map) nestedPassages.get(LEVEL_2_FIELD); + assertEquals(textFieldValue, level2.get(LEVEL_3_FIELD_TEXT)); + Map level3 = (Map) level2.get(LEVEL_3_FIELD_CONTAINER); + List embeddings = (List) level3.get(LEVEL_3_FIELD_EMBEDDING); + assertEquals(768, embeddings.size()); + for (Double embedding : embeddings) { + assertTrue(embedding >= 0.0 && embedding <= 1.0); + } + if (level3ExpectedValue.isPresent()) { + assertEquals(level3ExpectedValue.get(), level3.get("level_4_text_field")); + } + } + public void testTextEmbeddingProcessor_withBatchSizeInProcessor() throws Exception { String modelId = null; try { diff --git a/src/test/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessorTests.java b/src/test/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessorTests.java index 95ae1a2de..82b24324c 100644 --- a/src/test/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessorTests.java +++ b/src/test/java/org/opensearch/neuralsearch/processor/TextEmbeddingProcessorTests.java @@ -59,6 +59,9 @@ public class TextEmbeddingProcessorTests extends InferenceProcessorTestCase { protected static final String CHILD_FIELD_LEVEL_2 = "child_level2"; protected static final String CHILD_LEVEL_2_TEXT_FIELD_VALUE = "text_field_value"; protected static final String CHILD_LEVEL_2_KNN_FIELD = "test3_knn"; + protected static final String CHILD_1_TEXT_FIELD = "child_1_text_field"; + protected static final String TEXT_VALUE_1 = "text_value"; + protected static final String TEXT_FIELD_2 = "abc"; @Mock private MLCommonsClientAccessor mlCommonsClientAccessor; @@ -363,6 +366,126 @@ public void testNestedFieldInMapping_withMapTypeInput_successful() { } } + @SneakyThrows + public void testNestedFieldInMappingForSourceAndDestination_withIngestDocumentHasTheDestinationStructure_theSuccessful() { + Map sourceAndMetadata = new HashMap<>(); + sourceAndMetadata.put(IndexFieldMapper.NAME, "my_index"); + /* + modeling following document: + parent: + child_level_1: + child_level_1_text_field: "text" + child_level_2: + child_level_2_text_field: "abc" + */ + Map childLevel2NestedField = new HashMap<>(); + childLevel2NestedField.put(CHILD_LEVEL_2_TEXT_FIELD_VALUE, TEXT_FIELD_2); + Map childLevel2 = new HashMap<>(); + childLevel2.put(CHILD_FIELD_LEVEL_2, childLevel2NestedField); + childLevel2.put(CHILD_1_TEXT_FIELD, TEXT_VALUE_1); + Map childLevel1 = new HashMap<>(); + childLevel1.put(CHILD_FIELD_LEVEL_1, childLevel2); + sourceAndMetadata.put(PARENT_FIELD, childLevel1); + IngestDocument ingestDocument = new IngestDocument(sourceAndMetadata, new HashMap<>()); + + Map registry = new HashMap<>(); + Map config = new HashMap<>(); + config.put(TextEmbeddingProcessor.MODEL_ID_FIELD, "mockModelId"); + config.put( + TextEmbeddingProcessor.FIELD_MAP_FIELD, + ImmutableMap.of( + String.join(".", Arrays.asList(PARENT_FIELD, CHILD_FIELD_LEVEL_1, CHILD_1_TEXT_FIELD)), + CHILD_FIELD_LEVEL_2 + "." + CHILD_LEVEL_2_KNN_FIELD + ) + ); + TextEmbeddingProcessor processor = (TextEmbeddingProcessor) textEmbeddingProcessorFactory.create( + registry, + PROCESSOR_TAG, + DESCRIPTION, + config + ); + + List> modelTensorList = createRandomOneDimensionalMockVector(1, 100, 0.0f, 1.0f); + doAnswer(invocation -> { + ActionListener>> listener = invocation.getArgument(2); + listener.onResponse(modelTensorList); + return null; + }).when(mlCommonsClientAccessor).inferenceSentences(anyString(), anyList(), isA(ActionListener.class)); + + processor.execute(ingestDocument, (BiConsumer) (doc, ex) -> {}); + assertNotNull(ingestDocument); + assertNotNull(ingestDocument.getSourceAndMetadata().get(PARENT_FIELD)); + Map parent1AfterProcessor = (Map) ingestDocument.getSourceAndMetadata().get(PARENT_FIELD); + Map childLevel1Actual = (Map) parent1AfterProcessor.get(CHILD_FIELD_LEVEL_1); + assertEquals(2, childLevel1Actual.size()); + assertEquals(TEXT_VALUE_1, childLevel1Actual.get(CHILD_1_TEXT_FIELD)); + Map child2Actual = (Map) childLevel1Actual.get(CHILD_FIELD_LEVEL_2); + assertEquals(2, child2Actual.size()); + assertEquals(TEXT_FIELD_2, child2Actual.get(CHILD_LEVEL_2_TEXT_FIELD_VALUE)); + List vectors = (List) child2Actual.get(CHILD_LEVEL_2_KNN_FIELD); + assertEquals(100, vectors.size()); + for (Float vector : vectors) { + assertTrue(vector >= 0.0f && vector <= 1.0f); + } + } + + @SneakyThrows + public void testNestedFieldInMappingForSourceAndDestination_withIngestDocumentWithoutDestinationStructure_theSuccessful() { + Map sourceAndMetadata = new HashMap<>(); + sourceAndMetadata.put(IndexFieldMapper.NAME, "my_index"); + /* + modeling following document: + parent: + child_level_1: + child_level_1_text_field: "text" + */ + Map childLevel2 = new HashMap<>(); + childLevel2.put(CHILD_1_TEXT_FIELD, TEXT_VALUE_1); + Map childLevel1 = new HashMap<>(); + childLevel1.put(CHILD_FIELD_LEVEL_1, childLevel2); + sourceAndMetadata.put(PARENT_FIELD, childLevel1); + IngestDocument ingestDocument = new IngestDocument(sourceAndMetadata, new HashMap<>()); + + Map registry = new HashMap<>(); + Map config = new HashMap<>(); + config.put(TextEmbeddingProcessor.MODEL_ID_FIELD, "mockModelId"); + config.put( + TextEmbeddingProcessor.FIELD_MAP_FIELD, + ImmutableMap.of( + String.join(".", Arrays.asList(PARENT_FIELD, CHILD_FIELD_LEVEL_1, CHILD_1_TEXT_FIELD)), + CHILD_FIELD_LEVEL_2 + "." + CHILD_LEVEL_2_KNN_FIELD + ) + ); + TextEmbeddingProcessor processor = (TextEmbeddingProcessor) textEmbeddingProcessorFactory.create( + registry, + PROCESSOR_TAG, + DESCRIPTION, + config + ); + + List> modelTensorList = createRandomOneDimensionalMockVector(1, 100, 0.0f, 1.0f); + doAnswer(invocation -> { + ActionListener>> listener = invocation.getArgument(2); + listener.onResponse(modelTensorList); + return null; + }).when(mlCommonsClientAccessor).inferenceSentences(anyString(), anyList(), isA(ActionListener.class)); + + processor.execute(ingestDocument, (BiConsumer) (doc, ex) -> {}); + assertNotNull(ingestDocument); + assertNotNull(ingestDocument.getSourceAndMetadata().get(PARENT_FIELD)); + Map parent1AfterProcessor = (Map) ingestDocument.getSourceAndMetadata().get(PARENT_FIELD); + Map childLevel1Actual = (Map) parent1AfterProcessor.get(CHILD_FIELD_LEVEL_1); + assertEquals(2, childLevel1Actual.size()); + assertEquals(TEXT_VALUE_1, childLevel1Actual.get(CHILD_1_TEXT_FIELD)); + Map child2Actual = (Map) childLevel1Actual.get(CHILD_FIELD_LEVEL_2); + assertEquals(1, child2Actual.size()); + List vectors = (List) child2Actual.get(CHILD_LEVEL_2_KNN_FIELD); + assertEquals(100, vectors.size()); + for (Float vector : vectors) { + assertTrue(vector >= 0.0f && vector <= 1.0f); + } + } + @SneakyThrows public void testNestedFieldInMappingMixedSyntax_withMapTypeInput_successful() { Map sourceAndMetadata = new HashMap<>(); diff --git a/src/test/resources/processor/IndexMappings.json b/src/test/resources/processor/IndexMappings.json index 79eb34ce4..7afbaa92e 100644 --- a/src/test/resources/processor/IndexMappings.json +++ b/src/test/resources/processor/IndexMappings.json @@ -109,16 +109,23 @@ "level_3_text": { "type": "text" }, - "level_3_embedding": { - "type": "knn_vector", - "dimension": 768, - "method": { - "name": "hnsw", - "space_type": "l2", - "engine": "lucene", - "parameters": { - "ef_construction": 128, - "m": 24 + "level_3_container": { + "properties": { + "level_3_embedding": { + "type": "knn_vector", + "dimension": 768, + "method": { + "name": "hnsw", + "space_type": "l2", + "engine": "lucene", + "parameters": { + "ef_construction": 128, + "m": 24 + } + } + }, + "random_field1": { + "type": "text" } } } diff --git a/src/test/resources/processor/PipelineConfigurationWithNestedFieldsMapping.json b/src/test/resources/processor/PipelineConfigurationWithNestedFieldsMapping.json index 13bae8776..e768a8621 100644 --- a/src/test/resources/processor/PipelineConfigurationWithNestedFieldsMapping.json +++ b/src/test/resources/processor/PipelineConfigurationWithNestedFieldsMapping.json @@ -11,7 +11,7 @@ "game": "game_knn", "movie": "movie_knn" }, - "nested_passages.level_2.level_3_text": "level_3_embedding" + "nested_passages.level_2.level_3_text": "level_3_container.level_3_embedding" } } } diff --git a/src/test/resources/processor/ingest_doc3.json b/src/test/resources/processor/ingest_doc3.json index 8eae12fe2..3a81f838d 100644 --- a/src/test/resources/processor/ingest_doc3.json +++ b/src/test/resources/processor/ingest_doc3.json @@ -14,7 +14,10 @@ { "level_2": { - "level_3_text": "hello" + "level_3_text": "hello", + "level_3_container": { + "level_4_text_field": "abc" + } } } } diff --git a/src/test/resources/processor/ingest_doc4.json b/src/test/resources/processor/ingest_doc4.json new file mode 100644 index 000000000..b5d1671a4 --- /dev/null +++ b/src/test/resources/processor/ingest_doc4.json @@ -0,0 +1,20 @@ +{ + "title": "This is a good day", + "description": "daily logging", + "favor_list": [ + "key", + "hey", + "click" + ], + "favorites": { + "game": "cossacks", + "movie": "matrix" + }, + "nested_passages": + { + "level_2": + { + "level_3_text": "clown" + } + } +}