From 8b629484277b161e01fd6affc8ab7ab2f580bbeb Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Tue, 4 Jun 2024 22:52:49 -0700 Subject: [PATCH] [Derived Field] Dynamic FieldType inference based on random sampling of documents (#13592) (#13953) --------- (cherry picked from commit 6c1896bd11e18efeae51dc9ec4b8783987e5d1f8) Signed-off-by: Rishabh Maurya Signed-off-by: github-actions[bot] Co-authored-by: github-actions[bot] --- .../index/mapper/FieldTypeInference.java | 181 +++++++++++++++ .../index/mapper/FieldTypeInferenceTests.java | 210 ++++++++++++++++++ 2 files changed, 391 insertions(+) create mode 100644 server/src/main/java/org/opensearch/index/mapper/FieldTypeInference.java create mode 100644 server/src/test/java/org/opensearch/index/mapper/FieldTypeInferenceTests.java diff --git a/server/src/main/java/org/opensearch/index/mapper/FieldTypeInference.java b/server/src/main/java/org/opensearch/index/mapper/FieldTypeInference.java new file mode 100644 index 0000000000000..713bdc4e691cd --- /dev/null +++ b/server/src/main/java/org/opensearch/index/mapper/FieldTypeInference.java @@ -0,0 +1,181 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.mapper; + +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.ReaderUtil; +import org.opensearch.common.Randomness; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.search.lookup.SourceLookup; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.TreeSet; + +/** + * This class performs type inference by analyzing the _source documents. It uses a random sample of documents to infer the field type, similar to dynamic mapping type guessing logic. + * Unlike guessing based on the first document, where field could be missing, this method generates a random sample to make a more accurate inference. + * This approach is especially useful for handling missing fields, which is common in nested fields within derived fields of object types. + * + *

The sample size should be chosen carefully to ensure a high probability of selecting at least one document where the field is present. + * However, it's essential to strike a balance because a large sample size can lead to performance issues since each sample document's _source field is loaded and examined until the field is found. + * + *

Determining the sample size ({@code S}) is akin to deciding how many balls to draw from a bin, ensuring a high probability ({@code >=P}) of drawing at least one green ball (documents with the field) from a mixture of {@code R } red balls (documents without the field) and {@code G } green balls: + *

{@code
+ * P >= 1 - C(R, S) / C(R + G, S)
+ * }
+ * Here, {@code C()} represents the binomial coefficient. + * For a high confidence level, we aim for {@code P >= 0.95 }. For example, with {@code 10^7 } documents where the field is present in {@code 2% } of them, the sample size {@code S } should be around 149 to achieve a probability of {@code 0.95}. + */ +public class FieldTypeInference { + private final IndexReader indexReader; + private final String indexName; + private final MapperService mapperService; + // TODO expose using a index setting + private int sampleSize; + private static final int DEFAULT_SAMPLE_SIZE = 150; + private static final int MAX_SAMPLE_SIZE_ALLOWED = 1000; + + public FieldTypeInference(String indexName, MapperService mapperService, IndexReader indexReader) { + this.indexName = indexName; + this.mapperService = mapperService; + this.indexReader = indexReader; + this.sampleSize = DEFAULT_SAMPLE_SIZE; + } + + public void setSampleSize(int sampleSize) { + if (sampleSize > MAX_SAMPLE_SIZE_ALLOWED) { + throw new IllegalArgumentException("sample_size should be less than " + MAX_SAMPLE_SIZE_ALLOWED); + } + this.sampleSize = sampleSize; + } + + public int getSampleSize() { + return sampleSize; + } + + public Mapper infer(ValueFetcher valueFetcher) throws IOException { + RandomSourceValuesGenerator valuesGenerator = new RandomSourceValuesGenerator(sampleSize, indexReader, valueFetcher); + Mapper inferredMapper = null; + while (inferredMapper == null && valuesGenerator.hasNext()) { + List values = valuesGenerator.next(); + if (values == null || values.isEmpty()) { + continue; + } + // always use first value in case of multi value field to infer type + inferredMapper = inferTypeFromObject(values.get(0)); + } + return inferredMapper; + } + + private Mapper inferTypeFromObject(Object o) throws IOException { + if (o == null) { + return null; + } + DocumentMapper mapper = mapperService.documentMapper(); + XContentBuilder builder = XContentFactory.jsonBuilder().startObject().field("field", o).endObject(); + BytesReference bytesReference = BytesReference.bytes(builder); + SourceToParse sourceToParse = new SourceToParse(indexName, "_id", bytesReference, JsonXContent.jsonXContent.mediaType()); + ParsedDocument parsedDocument = mapper.parse(sourceToParse); + Mapping mapping = parsedDocument.dynamicMappingsUpdate(); + return mapping.root.getMapper("field"); + } + + private static class RandomSourceValuesGenerator implements Iterator> { + private final ValueFetcher valueFetcher; + private final IndexReader indexReader; + private final SourceLookup sourceLookup; + private final int[] docs; + private int iter; + private int leaf; + private final int MAX_ATTEMPTS_TO_GENERATE_RANDOM_SAMPLES = 10000; + + public RandomSourceValuesGenerator(int sampleSize, IndexReader indexReader, ValueFetcher valueFetcher) { + this.valueFetcher = valueFetcher; + this.indexReader = indexReader; + sampleSize = Math.min(sampleSize, indexReader.numDocs()); + this.docs = getSortedRandomNum( + sampleSize, + indexReader.numDocs(), + Math.max(sampleSize, MAX_ATTEMPTS_TO_GENERATE_RANDOM_SAMPLES) + ); + this.iter = 0; + this.leaf = -1; + this.sourceLookup = new SourceLookup(); + if (hasNext()) { + setNextLeaf(); + } + } + + @Override + public boolean hasNext() { + return iter < docs.length && leaf < indexReader.leaves().size(); + } + + /** + * Ensure hasNext() is called before calling next() + */ + @Override + public List next() { + int docID = docs[iter] - indexReader.leaves().get(leaf).docBase; + if (docID >= indexReader.leaves().get(leaf).reader().numDocs()) { + setNextLeaf(); + } + // deleted docs are getting used to infer type, which should be okay? + sourceLookup.setSegmentAndDocument(indexReader.leaves().get(leaf), docs[iter] - indexReader.leaves().get(leaf).docBase); + try { + iter++; + return valueFetcher.fetchValues(sourceLookup); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private void setNextLeaf() { + int readerIndex = ReaderUtil.subIndex(docs[iter], indexReader.leaves()); + if (readerIndex != leaf) { + leaf = readerIndex; + } else { + // this will only happen when leaves are exhausted and readerIndex will be indexReader.leaves()-1. + leaf++; + } + if (leaf < indexReader.leaves().size()) { + valueFetcher.setNextReader(indexReader.leaves().get(leaf)); + } + } + + private static int[] getSortedRandomNum(int sampleSize, int upperBound, int attempts) { + Set generatedNumbers = new TreeSet<>(); + Random random = Randomness.get(); + int itr = 0; + if (upperBound <= 10 * sampleSize) { + List numberList = new ArrayList<>(); + for (int i = 0; i < upperBound; i++) { + numberList.add(i); + } + Collections.shuffle(numberList, random); + generatedNumbers.addAll(numberList.subList(0, sampleSize)); + } else { + while (generatedNumbers.size() < sampleSize && itr++ < attempts) { + int randomNumber = random.nextInt(upperBound); + generatedNumbers.add(randomNumber); + } + } + return generatedNumbers.stream().mapToInt(Integer::valueOf).toArray(); + } + } +} diff --git a/server/src/test/java/org/opensearch/index/mapper/FieldTypeInferenceTests.java b/server/src/test/java/org/opensearch/index/mapper/FieldTypeInferenceTests.java new file mode 100644 index 0000000000000..807d0e96ce5e3 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/mapper/FieldTypeInferenceTests.java @@ -0,0 +1,210 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.mapper; + +import org.apache.lucene.document.Document; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.store.Directory; +import org.opensearch.common.lucene.Lucene; +import org.opensearch.core.index.Index; +import org.opensearch.index.query.QueryShardContext; +import org.opensearch.search.lookup.SourceLookup; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.mockito.Mockito.when; + +public class FieldTypeInferenceTests extends MapperServiceTestCase { + + private static final Map> documentMap; + static { + List listWithNull = new ArrayList<>(); + listWithNull.add(null); + documentMap = new HashMap<>(); + documentMap.put("text_field", List.of("The quick brown fox jumps over the lazy dog.")); + documentMap.put("int_field", List.of(789)); + documentMap.put("float_field", List.of(123.45)); + documentMap.put("date_field_1", List.of("2024-05-12T15:45:00Z")); + documentMap.put("date_field_2", List.of("2024-05-12")); + documentMap.put("boolean_field", List.of(true)); + documentMap.put("null_field", listWithNull); + documentMap.put("array_field_int", List.of(100, 200, 300, 400, 500)); + documentMap.put("array_field_text", List.of("100", "200")); + documentMap.put("object_type", List.of(Map.of("foo", Map.of("bar", 10)))); + } + + public void testJsonSupportedTypes() throws IOException { + MapperService mapperService = createMapperService(topMapping(b -> {})); + QueryShardContext queryShardContext = createQueryShardContext(mapperService); + when(queryShardContext.index()).thenReturn(new Index("test_index", "uuid")); + int totalDocs = 10000; + int docsPerLeafCount = 1000; + try (Directory dir = newDirectory()) { + IndexWriter iw = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER)); + Document d = new Document(); + for (int i = 0; i < totalDocs; i++) { + iw.addDocument(d); + if ((i + 1) % docsPerLeafCount == 0) { + iw.commit(); + } + } + try (IndexReader reader = DirectoryReader.open(iw)) { + iw.close(); + FieldTypeInference typeInference = new FieldTypeInference("test_index", queryShardContext.getMapperService(), reader); + String[] fieldName = { "text_field" }; + Mapper mapper = typeInference.infer(lookup -> documentMap.get(fieldName[0])); + assertEquals("text", mapper.typeName()); + + fieldName[0] = "int_field"; + mapper = typeInference.infer(lookup -> documentMap.get(fieldName[0])); + assertEquals("long", mapper.typeName()); + + fieldName[0] = "float_field"; + mapper = typeInference.infer(lookup -> documentMap.get(fieldName[0])); + assertEquals("float", mapper.typeName()); + + fieldName[0] = "date_field_1"; + mapper = typeInference.infer(lookup -> documentMap.get(fieldName[0])); + assertEquals("date", mapper.typeName()); + + fieldName[0] = "date_field_2"; + mapper = typeInference.infer(lookup -> documentMap.get(fieldName[0])); + assertEquals("date", mapper.typeName()); + + fieldName[0] = "boolean_field"; + mapper = typeInference.infer(lookup -> documentMap.get(fieldName[0])); + assertEquals("boolean", mapper.typeName()); + + fieldName[0] = "array_field_int"; + mapper = typeInference.infer(lookup -> documentMap.get(fieldName[0])); + assertEquals("long", mapper.typeName()); + + fieldName[0] = "array_field_text"; + mapper = typeInference.infer(lookup -> documentMap.get(fieldName[0])); + assertEquals("text", mapper.typeName()); + + fieldName[0] = "object_type"; + mapper = typeInference.infer(lookup -> documentMap.get(fieldName[0])); + assertEquals("object", mapper.typeName()); + + fieldName[0] = "null_field"; + mapper = typeInference.infer(lookup -> documentMap.get(fieldName[0])); + assertNull(mapper); + + // If field is missing ensure that sample docIDs generated for inference are ordered and are in bounds + fieldName[0] = "missing_field"; + List> docsEvaluated = new ArrayList<>(); + int[] totalDocsEvaluated = { 0 }; + typeInference.setSampleSize(50); + mapper = typeInference.infer(new ValueFetcher() { + @Override + public List fetchValues(SourceLookup lookup) throws IOException { + docsEvaluated.get(docsEvaluated.size() - 1).add(lookup.docId()); + totalDocsEvaluated[0]++; + return documentMap.get(fieldName[0]); + } + + @Override + public void setNextReader(LeafReaderContext leafReaderContext) { + docsEvaluated.add(new ArrayList<>()); + } + }); + assertNull(mapper); + assertEquals(typeInference.getSampleSize(), totalDocsEvaluated[0]); + for (List docsPerLeaf : docsEvaluated) { + for (int j = 0; j < docsPerLeaf.size() - 1; j++) { + assertTrue(docsPerLeaf.get(j) < docsPerLeaf.get(j + 1)); + } + if (!docsPerLeaf.isEmpty()) { + assertTrue(docsPerLeaf.get(0) >= 0 && docsPerLeaf.get(docsPerLeaf.size() - 1) < docsPerLeafCount); + } + } + } + } + } + + public void testDeleteAllDocs() throws IOException { + MapperService mapperService = createMapperService(topMapping(b -> {})); + QueryShardContext queryShardContext = createQueryShardContext(mapperService); + when(queryShardContext.index()).thenReturn(new Index("test_index", "uuid")); + int totalDocs = 10000; + int docsPerLeafCount = 1000; + try (Directory dir = newDirectory()) { + IndexWriter iw = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER)); + Document d = new Document(); + for (int i = 0; i < totalDocs; i++) { + iw.addDocument(d); + if ((i + 1) % docsPerLeafCount == 0) { + iw.commit(); + } + } + iw.deleteAll(); + iw.commit(); + + try (IndexReader reader = DirectoryReader.open(iw)) { + iw.close(); + FieldTypeInference typeInference = new FieldTypeInference("test_index", queryShardContext.getMapperService(), reader); + String[] fieldName = { "text_field" }; + Mapper mapper = typeInference.infer(lookup -> documentMap.get(fieldName[0])); + assertNull(mapper); + } + } + } + + public void testZeroDoc() throws IOException { + MapperService mapperService = createMapperService(topMapping(b -> {})); + QueryShardContext queryShardContext = createQueryShardContext(mapperService); + when(queryShardContext.index()).thenReturn(new Index("test_index", "uuid")); + try (Directory dir = newDirectory()) { + IndexWriter iw = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER)); + try (IndexReader reader = DirectoryReader.open(iw)) { + iw.close(); + FieldTypeInference typeInference = new FieldTypeInference("test_index", queryShardContext.getMapperService(), reader); + String[] fieldName = { "text_field" }; + Mapper mapper = typeInference.infer(lookup -> documentMap.get(fieldName[0])); + assertNull(mapper); + } + } + } + + public void testSampleGeneration() throws IOException { + MapperService mapperService = createMapperService(topMapping(b -> {})); + QueryShardContext queryShardContext = createQueryShardContext(mapperService); + when(queryShardContext.index()).thenReturn(new Index("test_index", "uuid")); + int totalDocs = 10000; + int docsPerLeafCount = 1000; + try (Directory dir = newDirectory()) { + IndexWriter iw = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER)); + Document d = new Document(); + for (int i = 0; i < totalDocs; i++) { + iw.addDocument(d); + if ((i + 1) % docsPerLeafCount == 0) { + iw.commit(); + } + } + try (IndexReader reader = DirectoryReader.open(iw)) { + iw.close(); + FieldTypeInference typeInference = new FieldTypeInference("test_index", queryShardContext.getMapperService(), reader); + typeInference.setSampleSize(1000 - 1); + typeInference.infer(lookup -> documentMap.get("unknown_field")); + assertThrows(IllegalArgumentException.class, () -> typeInference.setSampleSize(1000 + 1)); + typeInference.setSampleSize(1000); + typeInference.infer(lookup -> documentMap.get("unknown_field")); + } + } + } +}