diff --git a/server/src/main/java/org/apache/lucene/codecs/lucene90/Composite99DocValuesConsumer.java b/server/src/main/java/org/apache/lucene/codecs/lucene90/Composite99DocValuesConsumer.java new file mode 100644 index 0000000000000..c0a22a0b32fd2 --- /dev/null +++ b/server/src/main/java/org/apache/lucene/codecs/lucene90/Composite99DocValuesConsumer.java @@ -0,0 +1,68 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.apache.lucene.codecs.lucene90; + +import org.apache.lucene.codecs.DocValuesConsumer; +import org.apache.lucene.codecs.DocValuesProducer; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.SegmentWriteState; + +import java.io.IOException; + +/** + * This class is an abstraction of the {@link DocValuesConsumer} for the Star Tree index structure. + * It is responsible to consume various types of document values (numeric, binary, sorted, sorted numeric, + * and sorted set) for fields in the Star Tree index. + * + * @opensearch.experimental + */ +public class Composite99DocValuesConsumer extends DocValuesConsumer { + + Lucene90DocValuesConsumer lucene90DocValuesConsumer; + + public Composite99DocValuesConsumer( + SegmentWriteState state, + String dataCodec, + String dataExtension, + String metaCodec, + String metaExtension + ) throws IOException { + lucene90DocValuesConsumer = new Lucene90DocValuesConsumer(state, dataCodec, dataExtension, metaCodec, metaExtension); + } + + @Override + public void close() throws IOException { + lucene90DocValuesConsumer.close(); + } + + @Override + public void addNumericField(FieldInfo fieldInfo, DocValuesProducer docValuesProducer) throws IOException { + lucene90DocValuesConsumer.addNumericField(fieldInfo, docValuesProducer); + } + + @Override + public void addBinaryField(FieldInfo fieldInfo, DocValuesProducer docValuesProducer) throws IOException { + lucene90DocValuesConsumer.addNumericField(fieldInfo, docValuesProducer); + } + + @Override + public void addSortedField(FieldInfo fieldInfo, DocValuesProducer docValuesProducer) throws IOException { + lucene90DocValuesConsumer.addSortedField(fieldInfo, docValuesProducer); + } + + @Override + public void addSortedNumericField(FieldInfo fieldInfo, DocValuesProducer docValuesProducer) throws IOException { + lucene90DocValuesConsumer.addSortedNumericField(fieldInfo, docValuesProducer); + } + + @Override + public void addSortedSetField(FieldInfo fieldInfo, DocValuesProducer docValuesProducer) throws IOException { + lucene90DocValuesConsumer.addSortedSetField(fieldInfo, docValuesProducer); + } +} diff --git a/server/src/main/java/org/apache/lucene/codecs/lucene90/StarTree99DocValuesProducer.java b/server/src/main/java/org/apache/lucene/codecs/lucene90/StarTree99DocValuesProducer.java new file mode 100644 index 0000000000000..a2a8c1b4c35fe --- /dev/null +++ b/server/src/main/java/org/apache/lucene/codecs/lucene90/StarTree99DocValuesProducer.java @@ -0,0 +1,161 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.apache.lucene.codecs.lucene90; + +import org.apache.lucene.codecs.DocValuesProducer; +import org.apache.lucene.index.BinaryDocValues; +import org.apache.lucene.index.DocValuesType; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.FieldInfos; +import org.apache.lucene.index.IndexOptions; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.SegmentReadState; +import org.apache.lucene.index.SortedDocValues; +import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.index.SortedSetDocValues; +import org.apache.lucene.index.VectorEncoding; +import org.apache.lucene.index.VectorSimilarityFunction; +import org.opensearch.index.compositeindex.datacube.startree.aggregators.MetricEntry; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeHelper.fullFieldNameForStarTreeDimensionsDocValues; +import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeHelper.fullFieldNameForStarTreeMetricsDocValues; + +/** + * This class is a custom abstraction of the {@link DocValuesProducer} for the Star Tree index structure. + * It is responsible for providing access to various types of document values (numeric, binary, sorted, sorted numeric, + * and sorted set) for fields in the Star Tree index. + * + * @opensearch.experimental + */ +public class StarTree99DocValuesProducer extends DocValuesProducer { + + Lucene90DocValuesProducer lucene90DocValuesProducer; + private final List dimensions; + private final List metrics; + private final FieldInfos fieldInfos; + + public StarTree99DocValuesProducer( + SegmentReadState state, + String dataCodec, + String dataExtension, + String metaCodec, + String metaExtension, + List dimensions, + List metricEntries, + String compositeFieldName + ) throws IOException { + this.dimensions = dimensions; + this.metrics = metricEntries; + + // populates the dummy list of field infos to fetch doc id set iterators for respective fields. + // the dummy field info is used to fetch the doc id set iterators for respective fields based on field name + this.fieldInfos = new FieldInfos(getFieldInfoList(compositeFieldName)); + SegmentReadState segmentReadState = new SegmentReadState(state.directory, state.segmentInfo, fieldInfos, state.context); + lucene90DocValuesProducer = new Lucene90DocValuesProducer(segmentReadState, dataCodec, dataExtension, metaCodec, metaExtension); + } + + @Override + public NumericDocValues getNumeric(FieldInfo field) throws IOException { + return this.lucene90DocValuesProducer.getNumeric(field); + } + + @Override + public BinaryDocValues getBinary(FieldInfo field) throws IOException { + return this.lucene90DocValuesProducer.getBinary(field); + } + + @Override + public SortedDocValues getSorted(FieldInfo field) throws IOException { + return this.lucene90DocValuesProducer.getSorted(field); + } + + @Override + public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOException { + return this.lucene90DocValuesProducer.getSortedNumeric(field); + } + + @Override + public SortedSetDocValues getSortedSet(FieldInfo field) throws IOException { + return this.lucene90DocValuesProducer.getSortedSet(field); + } + + @Override + public void checkIntegrity() throws IOException { + this.lucene90DocValuesProducer.checkIntegrity(); + } + + // returns the doc id set iterator based on field name + public SortedNumericDocValues getSortedNumeric(String fieldName) throws IOException { + return this.lucene90DocValuesProducer.getSortedNumeric(fieldInfos.fieldInfo(fieldName)); + } + + @Override + public void close() throws IOException { + this.lucene90DocValuesProducer.close(); + } + + private FieldInfo[] getFieldInfoList(String compositeFieldName) { + FieldInfo[] fieldInfoList = new FieldInfo[this.dimensions.size() + metrics.size()]; + + // field number is not really used. We depend on unique field names to get the desired iterator + int fieldNumber = 0; + + for (FieldInfo dimension : this.dimensions) { + fieldInfoList[fieldNumber] = new FieldInfo( + fullFieldNameForStarTreeDimensionsDocValues(compositeFieldName, dimension.getName()), + fieldNumber, + false, + false, + true, + IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS, + DocValuesType.SORTED_NUMERIC, + -1, + Collections.emptyMap(), + 0, + 0, + 0, + 0, + VectorEncoding.FLOAT32, + VectorSimilarityFunction.EUCLIDEAN, + false, + false + ); + fieldNumber++; + } + for (MetricEntry metric : metrics) { + fieldInfoList[fieldNumber] = new FieldInfo( + fullFieldNameForStarTreeMetricsDocValues(compositeFieldName, metric.getMetricName(), metric.getMetricStat().getTypeName()), + fieldNumber, + false, + false, + true, + IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS, + DocValuesType.SORTED_NUMERIC, + -1, + Collections.emptyMap(), + 0, + 0, + 0, + 0, + VectorEncoding.FLOAT32, + VectorSimilarityFunction.EUCLIDEAN, + false, + false + ); + fieldNumber++; + } + + return fieldInfoList; + } + +} diff --git a/server/src/main/java/org/apache/lucene/index/SortedNumericDocValuesWriterHelper.java b/server/src/main/java/org/apache/lucene/index/SortedNumericDocValuesWriterHelper.java new file mode 100644 index 0000000000000..df489c3f27442 --- /dev/null +++ b/server/src/main/java/org/apache/lucene/index/SortedNumericDocValuesWriterHelper.java @@ -0,0 +1,54 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.apache.lucene.index; + +import org.apache.lucene.util.Counter; + +/** + * A helper class for writing sorted numeric doc values. + *

+ * This class provides a convenient way to add sorted numeric doc values to a field + * and retrieve the corresponding {@link SortedNumericDocValues} instance. + * + * @opensearch.experimental + */ +public class SortedNumericDocValuesWriterHelper { + + private final SortedNumericDocValuesWriter sortedNumericDocValuesWriter; + + /** + * Sole constructor. Constructs a new {@link SortedNumericDocValuesWriterHelper} instance. + * + * @param fieldInfo the field information for the field being written + * @param counter a counter for tracking memory usage + */ + public SortedNumericDocValuesWriterHelper(FieldInfo fieldInfo, Counter counter){ + sortedNumericDocValuesWriter = new SortedNumericDocValuesWriter(fieldInfo, counter); + } + + + /** + * Adds a value to the sorted numeric doc values for the specified document. + * + * @param docID the document ID + * @param value the value to add + */ + public void addValue(int docID, long value) { + sortedNumericDocValuesWriter.addValue(docID, value); + } + + /** + * Returns the {@link SortedNumericDocValues} instance containing the sorted numeric doc values + * + * @return the {@link SortedNumericDocValues} instance + */ + public SortedNumericDocValues getDocValues() { + return sortedNumericDocValuesWriter.getDocValues(); + } +} diff --git a/server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesFormat.java b/server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesFormat.java index 216ed4f68f333..150dea6277fff 100644 --- a/server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesFormat.java +++ b/server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesFormat.java @@ -37,6 +37,36 @@ public class Composite99DocValuesFormat extends DocValuesFormat { private final DocValuesFormat delegate; private final MapperService mapperService; + /** Data codec name for Composite Doc Values Format */ + public static final String DATA_CODEC_NAME = "Composite99FormatData"; + + /** Meta codec name for Composite Doc Values Format */ + public static final String META_CODEC_NAME = "Composite99FormatMeta"; + + /** Filename extension for the composite index data */ + public static final String DATA_EXTENSION = "cid"; + + /** Filename extension for the composite index meta */ + public static final String META_EXTENSION = "cim"; + + /** Data doc values codec name for Composite Doc Values Format */ + public static final String DATA_DOC_VALUES_CODEC = "Composite99DocValuesData"; + + /** Meta doc values codec name for Composite Doc Values Format */ + static final String META_DOC_VALUES_CODEC = "Composite99DocValuesMetadata"; + + /** Filename extension for the composite index data doc values */ + public static final String DATA_DOC_VALUES_EXTENSION = "cidvd"; + + /** Filename extension for the composite index meta doc values */ + public static final String META_DOC_VALUES_EXTENSION = "cidvm"; + + /** Initial version for the Composite90DocValuesFormat */ + public static final int VERSION_START = 0; + + /** Current version for the Composite90DocValuesFormat */ + public static final int VERSION_CURRENT = VERSION_START; + // needed for SPI public Composite99DocValuesFormat() { this(new Lucene90DocValuesFormat(), null); diff --git a/server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesReader.java b/server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesReader.java index df5008a7f294e..2f032aa8903ec 100644 --- a/server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesReader.java +++ b/server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesReader.java @@ -8,19 +8,49 @@ package org.opensearch.index.codec.composite; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.codecs.DocValuesProducer; +import org.apache.lucene.codecs.lucene90.StarTree99DocValuesProducer; import org.apache.lucene.index.BinaryDocValues; +import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.SegmentReadState; import org.apache.lucene.index.SortedDocValues; import org.apache.lucene.index.SortedNumericDocValues; import org.apache.lucene.index.SortedSetDocValues; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.store.ChecksumIndexInput; +import org.apache.lucene.store.IndexInput; import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.util.io.IOUtils; +import org.opensearch.index.codec.composite.datacube.startree.StarTreeValues; +import org.opensearch.index.compositeindex.CompositeIndexMetadata; +import org.opensearch.index.compositeindex.datacube.Dimension; +import org.opensearch.index.compositeindex.datacube.MergeDimension; +import org.opensearch.index.compositeindex.datacube.Metric; +import org.opensearch.index.compositeindex.datacube.startree.StarTreeField; +import org.opensearch.index.compositeindex.datacube.startree.StarTreeFieldConfiguration; +import org.opensearch.index.compositeindex.datacube.startree.aggregators.MetricEntry; +import org.opensearch.index.compositeindex.datacube.startree.meta.StarTreeMetadata; +import org.opensearch.index.compositeindex.datacube.startree.node.OffHeapStarTree; +import org.opensearch.index.compositeindex.datacube.startree.node.StarTree; +import org.opensearch.index.compositeindex.datacube.startree.node.StarTreeNode; +import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeHelper; import java.io.IOException; import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import static org.opensearch.index.compositeindex.CompositeIndexConstants.MAGIC_MARKER; /** * Reader for star tree index and star tree doc values from the segments @@ -29,11 +59,122 @@ */ @ExperimentalApi public class Composite99DocValuesReader extends DocValuesProducer implements CompositeIndexReader { - private DocValuesProducer delegate; + private static final Logger logger = LogManager.getLogger(CompositeIndexMetadata.class); + + private final DocValuesProducer delegate; + private IndexInput dataIn; + private ChecksumIndexInput metaIn; + private final Map starTreeMap = new LinkedHashMap<>(); + private final Map compositeIndexMetadataMap = new LinkedHashMap<>(); + private final Map compositeDocValuesProducerMap = new LinkedHashMap<>(); + private final List compositeFieldInfos = new ArrayList<>(); + private final SegmentReadState readState; - public Composite99DocValuesReader(DocValuesProducer producer, SegmentReadState state) throws IOException { + public Composite99DocValuesReader(DocValuesProducer producer, SegmentReadState readState) throws IOException { this.delegate = producer; - // TODO : read star tree files + this.readState = readState; + + String metaFileName = IndexFileNames.segmentFileName( + readState.segmentInfo.name, + readState.segmentSuffix, + Composite99DocValuesFormat.META_EXTENSION + ); + + String dataFileName = IndexFileNames.segmentFileName( + readState.segmentInfo.name, + readState.segmentSuffix, + Composite99DocValuesFormat.DATA_EXTENSION + ); + + boolean success = false; + try { + + // initialize meta input + dataIn = readState.directory.openInput(dataFileName, readState.context); + CodecUtil.checkIndexHeader( + dataIn, + Composite99DocValuesFormat.DATA_CODEC_NAME, + Composite99DocValuesFormat.VERSION_START, + Composite99DocValuesFormat.VERSION_CURRENT, + readState.segmentInfo.getId(), + readState.segmentSuffix + ); + + // initialize data input + metaIn = readState.directory.openChecksumInput(metaFileName, readState.context); + Throwable priorE = null; + try { + CodecUtil.checkIndexHeader( + metaIn, + Composite99DocValuesFormat.META_CODEC_NAME, + Composite99DocValuesFormat.VERSION_START, + Composite99DocValuesFormat.VERSION_CURRENT, + readState.segmentInfo.getId(), + readState.segmentSuffix + ); + + while (true) { + + //validate magic marker + long magicMarker = metaIn.readLong(); + if (magicMarker == -1) { + logger.info("EOF reached for composite index metadata"); + break; + } else if (magicMarker < 0) { + throw new CorruptIndexException("Unknown token encountered: " + magicMarker, metaIn); + } else if (MAGIC_MARKER != magicMarker) { + logger.error("Invalid composite field magic marker"); + throw new IOException("Invalid composite field magic marker"); + } + + // construct composite index metadata + CompositeIndexMetadata compositeIndexMetadata = new CompositeIndexMetadata(metaIn); + String compositeFieldName = compositeIndexMetadata.getCompositeFieldName(); + compositeFieldInfos.add( + new CompositeIndexFieldInfo(compositeFieldName, compositeIndexMetadata.getCompositeFieldType()) + ); + + switch (compositeIndexMetadata.getCompositeFieldType()) { + case STAR_TREE: + StarTreeMetadata starTreeMetadata = compositeIndexMetadata.getStarTreeMetadata(); + StarTree starTree = new OffHeapStarTree(dataIn, starTreeMetadata); + starTreeMap.put(compositeFieldName, starTree); + compositeIndexMetadataMap.put(compositeFieldName, compositeIndexMetadata); + + List dimensionFieldNumbers = starTreeMetadata.getDimensionFieldNumbers(); + List dimensions = new ArrayList<>(); + for (Integer fieldNumber : dimensionFieldNumbers) { + dimensions.add(readState.fieldInfos.fieldInfo(fieldNumber)); + } + + StarTree99DocValuesProducer starTreeDocValuesProducer = new StarTree99DocValuesProducer( + readState, + Composite99DocValuesFormat.DATA_DOC_VALUES_CODEC, + Composite99DocValuesFormat.DATA_DOC_VALUES_EXTENSION, + Composite99DocValuesFormat.META_DOC_VALUES_CODEC, + Composite99DocValuesFormat.META_DOC_VALUES_EXTENSION, + dimensions, + starTreeMetadata.getMetricEntries(), + compositeFieldName + ); + compositeDocValuesProducerMap.put(compositeFieldName, starTreeDocValuesProducer); + + break; + default: + throw new CorruptIndexException("Invalid composite field type found in the file", dataIn); + } + } + } catch (Throwable t) { + priorE = t; + } finally { + CodecUtil.checkFooter(metaIn, priorE); + } + success = true; + } finally { + if (success == false) { + IOUtils.closeWhileHandlingException(this); + } + } } @Override @@ -64,24 +205,107 @@ public SortedSetDocValues getSortedSet(FieldInfo field) throws IOException { @Override public void checkIntegrity() throws IOException { delegate.checkIntegrity(); - // Todo : check integrity of composite index related [star tree] files + CodecUtil.checksumEntireFile(dataIn); } @Override public void close() throws IOException { delegate.close(); - // Todo: close composite index related files [star tree] files + boolean success = false; + try { + IOUtils.close(metaIn, dataIn); + for (DocValuesProducer docValuesProducer : compositeDocValuesProducerMap.values()) { + IOUtils.close(docValuesProducer); + } + success = true; + } finally { + if (!success) { + IOUtils.closeWhileHandlingException(metaIn, dataIn); + } + starTreeMap.clear(); + compositeIndexMetadataMap.clear(); + compositeDocValuesProducerMap.clear(); + metaIn = null; + dataIn = null; + } } @Override public List getCompositeIndexFields() { - // todo : read from file formats and get the field names. - return new ArrayList<>(); + return compositeFieldInfos; } @Override - public CompositeIndexValues getCompositeIndexValues(CompositeIndexFieldInfo compositeIndexFieldInfo) throws IOException { - // TODO : read compositeIndexValues [starTreeValues] from star tree files - throw new UnsupportedOperationException(); + public CompositeIndexValues getCompositeIndexValues(CompositeIndexFieldInfo compositeIndexFieldInfo) throws + IOException { + + switch (compositeIndexFieldInfo.getType()) { + case STAR_TREE: + CompositeIndexMetadata compositeIndexMetadata = compositeIndexMetadataMap.get(compositeIndexFieldInfo.getField()); + StarTreeMetadata starTreeMetadata = compositeIndexMetadata.getStarTreeMetadata(); + Set skipStarNodeCreationInDimsFieldNumbers = starTreeMetadata.getSkipStarNodeCreationInDims(); + Set skipStarNodeCreationInDims = new HashSet<>(); + for (Integer fieldNumber : skipStarNodeCreationInDimsFieldNumbers) { + skipStarNodeCreationInDims.add(readState.fieldInfos.fieldInfo(fieldNumber).getName()); + } + + List dimensionFieldNumbers = starTreeMetadata.getDimensionFieldNumbers(); + List dimensions = new ArrayList<>(); + List mergeDimensions = new ArrayList<>(); + for (Integer fieldNumber : dimensionFieldNumbers) { + dimensions.add(readState.fieldInfos.fieldInfo(fieldNumber).getName()); + mergeDimensions.add(new MergeDimension(readState.fieldInfos.fieldInfo(fieldNumber).name)); + } + + Map starTreeMetricMap = new ConcurrentHashMap<>(); + for (MetricEntry metricEntry : starTreeMetadata.getMetricEntries()) { + String metricName = metricEntry.getMetricName(); + + Metric metric = starTreeMetricMap.computeIfAbsent(metricName, field -> new Metric(field, new ArrayList<>())); + metric.getMetrics().add(metricEntry.getMetricStat()); + } + List starTreeMetrics = new ArrayList<>(starTreeMetricMap.values()); + + StarTreeField starTreeField = new StarTreeField( + compositeIndexMetadata.getCompositeFieldName(), + mergeDimensions, + starTreeMetrics, + new StarTreeFieldConfiguration( + starTreeMetadata.getMaxLeafDocs(), + skipStarNodeCreationInDims, + starTreeMetadata.getStarTreeBuildMode() + ) + ); + StarTreeNode rootNode = starTreeMap.get(compositeIndexFieldInfo.getField()).getRoot(); + StarTree99DocValuesProducer starTree99DocValuesProducer = (StarTree99DocValuesProducer) compositeDocValuesProducerMap.get( + compositeIndexMetadata.getCompositeFieldName() + ); + Map dimensionsDocIdSetIteratorMap = new LinkedHashMap<>(); + Map metricsDocIdSetIteratorMap = new LinkedHashMap<>(); + + for (String dimension : dimensions) { + dimensionsDocIdSetIteratorMap.put( + dimension, + starTree99DocValuesProducer.getSortedNumeric( + StarTreeHelper.fullFieldNameForStarTreeDimensionsDocValues(starTreeField.getName(), dimension) + ) + ); + } + + for (MetricEntry metricEntry : starTreeMetadata.getMetricEntries()) { + String metricFullName = StarTreeHelper.fullFieldNameForStarTreeMetricsDocValues( + starTreeField.getName(), + metricEntry.getMetricName(), + metricEntry.getMetricStat().getTypeName() + ); + metricsDocIdSetIteratorMap.put(metricFullName, starTree99DocValuesProducer.getSortedNumeric(metricFullName)); + } + + return new StarTreeValues(starTreeField, rootNode, dimensionsDocIdSetIteratorMap, metricsDocIdSetIteratorMap); + + default: + throw new CorruptIndexException("Unsupported composite index field type: ", compositeIndexFieldInfo.getType().getName()); + } + } } diff --git a/server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesWriter.java b/server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesWriter.java index ec97053bdff05..c3f74706a346d 100644 --- a/server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesWriter.java +++ b/server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesWriter.java @@ -10,12 +10,17 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.codecs.DocValuesConsumer; import org.apache.lucene.codecs.DocValuesProducer; +import org.apache.lucene.codecs.lucene90.Composite99DocValuesConsumer; import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.MergeState; import org.apache.lucene.index.SegmentWriteState; +import org.apache.lucene.store.IndexOutput; import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.util.io.IOUtils; import org.opensearch.index.codec.composite.datacube.startree.StarTreeValues; import org.opensearch.index.compositeindex.datacube.startree.StarTreeField; import org.opensearch.index.compositeindex.datacube.startree.builder.StarTreesBuilder; @@ -23,7 +28,7 @@ import org.opensearch.index.mapper.MapperService; import java.io.IOException; -import java.util.Collections; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -45,11 +50,15 @@ public class Composite99DocValuesWriter extends DocValuesConsumer { AtomicReference mergeState = new AtomicReference<>(); private final Set compositeMappedFieldTypes; private final Set compositeFieldSet; + private DocValuesConsumer composite99DocValuesConsumer; + + public IndexOutput dataOut; + public IndexOutput metaOut; private final Map fieldProducerMap = new HashMap<>(); private static final Logger logger = LogManager.getLogger(Composite99DocValuesWriter.class); - public Composite99DocValuesWriter(DocValuesConsumer delegate, SegmentWriteState segmentWriteState, MapperService mapperService) { + public Composite99DocValuesWriter(DocValuesConsumer delegate, SegmentWriteState segmentWriteState, MapperService mapperService) throws IOException { this.delegate = delegate; this.state = segmentWriteState; @@ -59,6 +68,51 @@ public Composite99DocValuesWriter(DocValuesConsumer delegate, SegmentWriteState for (CompositeMappedFieldType type : compositeMappedFieldTypes) { compositeFieldSet.addAll(type.fields()); } + + boolean success = false; + try { + this.composite99DocValuesConsumer = new Composite99DocValuesConsumer( + segmentWriteState, + Composite99DocValuesFormat.DATA_DOC_VALUES_CODEC, + Composite99DocValuesFormat.DATA_DOC_VALUES_EXTENSION, + Composite99DocValuesFormat.META_DOC_VALUES_CODEC, + Composite99DocValuesFormat.META_DOC_VALUES_EXTENSION + ); + + String dataFileName = IndexFileNames.segmentFileName( + segmentWriteState.segmentInfo.name, + segmentWriteState.segmentSuffix, + Composite99DocValuesFormat.DATA_EXTENSION + ); + dataOut = segmentWriteState.directory.createOutput(dataFileName, segmentWriteState.context); + CodecUtil.writeIndexHeader( + dataOut, + Composite99DocValuesFormat.DATA_CODEC_NAME, + Composite99DocValuesFormat.VERSION_CURRENT, + segmentWriteState.segmentInfo.getId(), + segmentWriteState.segmentSuffix + ); + + String metaFileName = IndexFileNames.segmentFileName( + segmentWriteState.segmentInfo.name, + segmentWriteState.segmentSuffix, + Composite99DocValuesFormat.META_EXTENSION + ); + metaOut = segmentWriteState.directory.createOutput(metaFileName, segmentWriteState.context); + CodecUtil.writeIndexHeader( + metaOut, + Composite99DocValuesFormat.META_CODEC_NAME, + Composite99DocValuesFormat.VERSION_CURRENT, + segmentWriteState.segmentInfo.getId(), + segmentWriteState.segmentSuffix + ); + + success = true; + } finally { + if (success == false) { + IOUtils.closeWhileHandlingException(this); + } + } } @Override @@ -93,6 +147,26 @@ public void addSortedSetField(FieldInfo field, DocValuesProducer valuesProducer) @Override public void close() throws IOException { delegate.close(); + boolean success = false; + try { + if (metaOut != null) { + metaOut.writeLong(-1); // write EOF marker + CodecUtil.writeFooter(metaOut); // write checksum + } + if (dataOut != null) { + CodecUtil.writeFooter(dataOut); // write checksum + } + + success = true; + } finally { + if (success) { + IOUtils.close(dataOut, metaOut, composite99DocValuesConsumer); + } else { + IOUtils.closeWhileHandlingException(dataOut, metaOut, composite99DocValuesConsumer); + } + metaOut = dataOut = null; + composite99DocValuesConsumer = null; + } } private void createCompositeIndicesIfPossible(DocValuesProducer valuesProducer, FieldInfo field) throws IOException { @@ -104,9 +178,9 @@ private void createCompositeIndicesIfPossible(DocValuesProducer valuesProducer, // we have all the required fields to build composite fields if (compositeFieldSet.isEmpty()) { for (CompositeMappedFieldType mappedType : compositeMappedFieldTypes) { - if (mappedType.getCompositeIndexType().equals(CompositeMappedFieldType.CompositeFieldType.STAR_TREE)) { + if (mappedType instanceof StarTreeMapper.StarTreeFieldType) { try (StarTreesBuilder starTreesBuilder = new StarTreesBuilder(state, mapperService)) { - starTreesBuilder.build(fieldProducerMap); + starTreesBuilder.build(metaOut, dataOut, fieldProducerMap, composite99DocValuesConsumer); } } } @@ -122,6 +196,7 @@ public void merge(MergeState mergeState) throws IOException { /** * Merges composite fields from multiple segments + * * @param mergeState merge state */ private void mergeCompositeFields(MergeState mergeState) throws IOException { @@ -130,6 +205,7 @@ private void mergeCompositeFields(MergeState mergeState) throws IOException { /** * Merges star tree data fields from multiple segments + * * @param mergeState merge state */ private void mergeStarTreeFields(MergeState mergeState) throws IOException { @@ -151,8 +227,7 @@ private void mergeStarTreeFields(MergeState mergeState) throws IOException { if (fieldInfo.getType().equals(CompositeMappedFieldType.CompositeFieldType.STAR_TREE)) { CompositeIndexValues compositeIndexValues = reader.getCompositeIndexValues(fieldInfo); if (compositeIndexValues instanceof StarTreeValues) { - List fieldsList = starTreeSubsPerField.getOrDefault(fieldInfo.getField(), Collections.emptyList()); - + List fieldsList = starTreeSubsPerField.getOrDefault(fieldInfo.getField(), new ArrayList<>()); if (!starTreeFieldMap.containsKey(fieldInfo.getField())) { starTreeFieldMap.put(fieldInfo.getField(), ((StarTreeValues) compositeIndexValues).getStarTreeField()); } @@ -168,7 +243,7 @@ private void mergeStarTreeFields(MergeState mergeState) throws IOException { } } } - final StarTreesBuilder starTreesBuilder = new StarTreesBuilder(state, mapperService); - starTreesBuilder.buildDuringMerge(starTreeFieldMap, starTreeSubsPerField); + StarTreesBuilder starTreesBuilder = new StarTreesBuilder(state, mapperService); + starTreesBuilder.buildDuringMerge(metaOut, dataOut, starTreeFieldMap, starTreeSubsPerField, composite99DocValuesConsumer); } } diff --git a/server/src/main/java/org/opensearch/index/compositeindex/CompositeIndexConstants.java b/server/src/main/java/org/opensearch/index/compositeindex/CompositeIndexConstants.java new file mode 100644 index 0000000000000..defb4a26de260 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/compositeindex/CompositeIndexConstants.java @@ -0,0 +1,26 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.compositeindex; + +/** + * This class contains constants used in the Composite Index implementation. + */ +public class CompositeIndexConstants { + + /** + * The magic marker value used for sanity checks in the Composite Index implementation. + */ + public static final long MAGIC_MARKER = 0xC0950513F1E1DL; // Composite Field + + /** + * The version of the Composite Index implementation. + */ + public static final int VERSION = 1; + +} diff --git a/server/src/main/java/org/opensearch/index/compositeindex/CompositeIndexMetadata.java b/server/src/main/java/org/opensearch/index/compositeindex/CompositeIndexMetadata.java new file mode 100644 index 0000000000000..900d13cbb1f49 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/compositeindex/CompositeIndexMetadata.java @@ -0,0 +1,89 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.compositeindex; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.store.IndexInput; +import org.opensearch.index.compositeindex.datacube.startree.meta.StarTreeMetadata; +import org.opensearch.index.mapper.CompositeMappedFieldType; + +import java.io.IOException; + +import static org.opensearch.index.compositeindex.CompositeIndexConstants.VERSION; + +/** + * This class represents the metadata of a Composite Index, which includes information about + * the composite field name, type, and the specific metadata for the type of composite field + * (e.g., StarTree metadata). + * + * @opensearch.experimental + */ +public class CompositeIndexMetadata { + + private static final Logger logger = LogManager.getLogger(CompositeIndexMetadata.class); + private final String compositeFieldName; + private final CompositeMappedFieldType.CompositeFieldType compositeFieldType; + private final StarTreeMetadata starTreeMetadata; + + /** + * Constructs a CompositeIndexMetadata object from the provided IndexInput and magic marker. + * + * @param meta the IndexInput containing the metadata + * @throws IOException if an I/O error occurs while reading the metadata + */ + public CompositeIndexMetadata(IndexInput meta) throws IOException { + int version = meta.readInt(); + if (VERSION != version) { + logger.error("Invalid composite field version"); + throw new IOException("Invalid composite field version"); + } + + compositeFieldName = meta.readString(); + compositeFieldType = CompositeMappedFieldType.CompositeFieldType.fromName(meta.readString()); + + switch (compositeFieldType) { + // support for type of composite fields can be added in the future. + case STAR_TREE: + starTreeMetadata = new StarTreeMetadata(meta, compositeFieldName, compositeFieldType.getName()); + break; + default: + throw new CorruptIndexException("Invalid composite field type present in the file", meta); + } + + } + + /** + * Returns the star-tree metadata. + * + * @return the StarTreeMetadata + */ + public StarTreeMetadata getStarTreeMetadata() { + return starTreeMetadata; + } + + /** + * Returns the name of the composite field. + * + * @return the composite field name + */ + public String getCompositeFieldName() { + return compositeFieldName; + } + + /** + * Returns the type of the composite field. + * + * @return the composite field type + */ + public CompositeMappedFieldType.CompositeFieldType getCompositeFieldType() { + return compositeFieldType; + } +} diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/MergeDimension.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/MergeDimension.java new file mode 100644 index 0000000000000..1e15cae2e0029 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/MergeDimension.java @@ -0,0 +1,56 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.compositeindex.datacube; + +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.index.mapper.CompositeDataCubeFieldType; + +import java.io.IOException; +import java.util.Objects; + +/** + * Composite index merge dimension class + * + * @opensearch.experimental + */ +public class MergeDimension implements Dimension { + public static final String MERGE = "merge"; + private final String field; + + public MergeDimension(String field) { + this.field = field; + } + + public String getField() { + return field; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + builder.startObject(); + builder.field(CompositeDataCubeFieldType.NAME, field); + builder.field(CompositeDataCubeFieldType.TYPE, MERGE); + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + MergeDimension dimension = (MergeDimension) o; + return Objects.equals(field, dimension.getField()); + } + + @Override + public int hashCode() { + return Objects.hash(field); + } +} diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MetricAggregatorInfo.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MetricAggregatorInfo.java index 3895b53fe7466..7520b73bc1e3b 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MetricAggregatorInfo.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MetricAggregatorInfo.java @@ -79,7 +79,12 @@ public StarTreeNumericType getAggregatedValueType() { * @return field name with metric type and field */ public String toFieldName() { - return starFieldName + DELIMITER + field + DELIMITER + metricStat.getTypeName(); + return toFieldName(starFieldName, field, metricStat.getTypeName()); + + } + + public static String toFieldName(String starFieldName, String field, String metricName) { + return starFieldName + DELIMITER + field + DELIMITER + metricName; } @Override @@ -94,7 +99,7 @@ public boolean equals(Object obj) { } if (obj instanceof MetricAggregatorInfo) { MetricAggregatorInfo anotherPair = (MetricAggregatorInfo) obj; - return metricStat == anotherPair.metricStat && field.equals(anotherPair.field); + return metricStat.equals(anotherPair.metricStat) && field.equals(anotherPair.field); } return false; } diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MetricEntry.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MetricEntry.java new file mode 100644 index 0000000000000..683153ecde689 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MetricEntry.java @@ -0,0 +1,35 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.compositeindex.datacube.startree.aggregators; + +import org.opensearch.index.compositeindex.datacube.MetricStat; + +/** + * Holds the pair of metric name and it's associated stat + * + * @opensearch.experimental + */ +public class MetricEntry { + + private final String metricName; + private final MetricStat metricStat; + + public MetricEntry(String metricName, MetricStat metricStat) { + this.metricName = metricName; + this.metricStat = metricStat; + } + + public String getMetricName() { + return metricName; + } + + public MetricStat getMetricStat() { + return metricStat; + } +} diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilder.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilder.java index 15bc8a624d01e..ffaa90cbaafda 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilder.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilder.java @@ -7,9 +7,24 @@ */ package org.opensearch.index.compositeindex.datacube.startree.builder; +import java.time.temporal.ChronoField; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.codecs.DocValuesConsumer; import org.apache.lucene.codecs.DocValuesProducer; +import org.apache.lucene.index.DocValuesType; +import org.apache.lucene.index.EmptyDocValuesProducer; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.IndexOptions; +import org.apache.lucene.index.SegmentWriteState; +import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.index.SortedNumericDocValuesWriterHelper; +import org.apache.lucene.index.VectorEncoding; +import org.apache.lucene.index.VectorSimilarityFunction; +import org.opensearch.common.time.DateUtils; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.util.Counter; +import org.apache.lucene.util.NumericUtils; import org.apache.lucene.index.FieldInfo; import org.apache.lucene.index.SegmentWriteState; import org.opensearch.index.compositeindex.datacube.Dimension; @@ -31,12 +46,17 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeHelper.fullFieldNameForStarTreeDimensionsDocValues; +import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeHelper.fullFieldNameForStarTreeMetricsDocValues; import static org.opensearch.index.compositeindex.datacube.startree.utils.TreeNode.ALL; @@ -68,18 +88,30 @@ public abstract class BaseStarTreeBuilder implements StarTreeBuilder { private final StarTreeField starTreeField; private final MapperService mapperService; - private final SegmentWriteState state; + private final SegmentWriteState writeState; + + private final IndexOutput metaOut; + private final IndexOutput dataOut; /** * Reads all the configuration related to dimensions and metrics, builds a star-tree based on the different construction parameters. * - * @param starTreeField holds the configuration for the star tree - * @param state stores the segment write state - * @param mapperService helps to find the original type of the field + * @param starTreeField holds the configuration for the star tree + * @param writeState stores the segment write writeState + * @param mapperService helps to find the original type of the field */ - protected BaseStarTreeBuilder(StarTreeField starTreeField, SegmentWriteState state, MapperService mapperService) { + protected BaseStarTreeBuilder( + IndexOutput metaOut, + IndexOutput dataOut, + StarTreeField starTreeField, + SegmentWriteState writeState, + MapperService mapperService + ) { logger.debug("Building star tree : {}", starTreeField.getName()); + this.metaOut = metaOut; + this.dataOut = dataOut; + this.starTreeField = starTreeField; StarTreeFieldConfiguration starTreeFieldSpec = starTreeField.getStarTreeConfig(); @@ -87,9 +119,9 @@ protected BaseStarTreeBuilder(StarTreeField starTreeField, SegmentWriteState sta this.numDimensions = dimensionsSplitOrder.size(); this.skipStarNodeCreationForDimensions = new HashSet<>(); - this.totalSegmentDocs = state.segmentInfo.maxDoc(); + this.totalSegmentDocs = writeState.segmentInfo.maxDoc(); this.mapperService = mapperService; - this.state = state; + this.writeState = writeState; Set skipStarNodeCreationForDimensions = starTreeFieldSpec.getSkipStarNodeCreationInDims(); @@ -134,6 +166,263 @@ public List generateMetricAggregatorInfos(MapperService ma return metricAggregatorInfos; } + /** + * Generates the configuration required to perform aggregation for all the metrics on a field + * + * @return list of MetricAggregatorInfo + */ + public List getMetricReaders(SegmentWriteState state, Map fieldProducerMap) + throws IOException { + + List metricReaders = new ArrayList<>(); + for (Metric metric : this.starTreeField.getMetrics()) { + for (MetricStat metricType : metric.getMetrics()) { + SequentialDocValuesIterator metricReader; + FieldInfo metricFieldInfo = state.fieldInfos.fieldInfo(metric.getField()); + metricReader = new SequentialDocValuesIterator( + fieldProducerMap.get(metricFieldInfo.name).getSortedNumeric(metricFieldInfo) + ); + metricReaders.add(metricReader); + } + } + return metricReaders; + } + + /** + * Builds the star tree from the original segment documents + * + * @param fieldProducerMap contain s the docValues producer to get docValues associated with each field + * @param fieldNumberAcrossStarTrees + * @param starTreeDocValuesConsumer + * @throws IOException when we are unable to build star-tree + */ + public void build( + Map fieldProducerMap, + AtomicInteger fieldNumberAcrossStarTrees, + DocValuesConsumer starTreeDocValuesConsumer + ) throws IOException { + long startTime = System.currentTimeMillis(); + logger.debug("Star-tree build is a go with star tree field {}", starTreeField.getName()); + + List metricReaders = getMetricReaders(writeState, fieldProducerMap); + List dimensionsSplitOrder = starTreeField.getDimensionsOrder(); + SequentialDocValuesIterator[] dimensionReaders = new SequentialDocValuesIterator[dimensionsSplitOrder.size()]; + for (int i = 0; i < numDimensions; i++) { + String dimension = dimensionsSplitOrder.get(i).getField(); + FieldInfo dimensionFieldInfo = writeState.fieldInfos.fieldInfo(dimension); + dimensionReaders[i] = new SequentialDocValuesIterator( + fieldProducerMap.get(dimensionFieldInfo.name).getSortedNumeric(dimensionFieldInfo) + ); + } + Iterator starTreeDocumentIterator = sortAndAggregateSegmentDocuments( + totalSegmentDocs, + dimensionReaders, + metricReaders + ); + logger.debug("Sorting and aggregating star-tree in ms : {}", (System.currentTimeMillis() - startTime)); + build(starTreeDocumentIterator, fieldNumberAcrossStarTrees, starTreeDocValuesConsumer); + logger.debug("Finished Building star-tree in ms : {}", (System.currentTimeMillis() - startTime)); + } + + /** + * Builds the star tree using sorted and aggregated star-tree Documents + * + * @param starTreeDocumentIterator contains the sorted and aggregated documents + * @param fieldNumberAcrossStarTrees + * @param starTreeDocValuesConsumer + * @throws IOException when we are unable to build star-tree + */ + public void build( + Iterator starTreeDocumentIterator, + AtomicInteger fieldNumberAcrossStarTrees, + DocValuesConsumer starTreeDocValuesConsumer + ) throws IOException { + int numSegmentStarTreeDocument = totalSegmentDocs; + + while (starTreeDocumentIterator.hasNext()) { + appendToStarTree(starTreeDocumentIterator.next()); + } + int numStarTreeDocument = numStarTreeDocs; + logger.debug("Generated star tree docs : [{}] from segment docs : [{}]", numStarTreeDocument, numSegmentStarTreeDocument); + + if (numStarTreeDocs == 0) { + // serialize the star tree data + serializeStarTree(numSegmentStarTreeDocument); + return; + } + + constructStarTree(rootNode, 0, numStarTreeDocs); + int numStarTreeDocumentUnderStarNode = numStarTreeDocs - numStarTreeDocument; + logger.debug( + "Finished constructing star-tree, got [ {} ] tree nodes and [ {} ] starTreeDocument under star-node", + numStarTreeNodes, + numStarTreeDocumentUnderStarNode + ); + + createAggregatedDocs(rootNode); + int numAggregatedStarTreeDocument = numStarTreeDocs - numStarTreeDocument - numStarTreeDocumentUnderStarNode; + logger.debug("Finished creating aggregated documents : {}", numAggregatedStarTreeDocument); + + // Create doc values indices in disk + createSortedDocValuesIndices(starTreeDocValuesConsumer, fieldNumberAcrossStarTrees); + + // serialize star-tree + serializeStarTree(numSegmentStarTreeDocument); + } + + private long getTimeStampVal(final String fieldName, final long val) { + long roundedDate = 0; + long ratio = 0; + + switch (fieldName) { + + case "@timestamp": + ratio = ChronoField.MINUTE_OF_HOUR.getBaseUnit().getDuration().toMillis(); + roundedDate = DateUtils.roundFloor(val, ratio); + return roundedDate; + case "hour": + ratio = ChronoField.HOUR_OF_DAY.getBaseUnit().getDuration().toMillis(); + roundedDate = DateUtils.roundFloor(val, ratio); + return roundedDate; + case "day": + ratio = ChronoField.DAY_OF_MONTH.getBaseUnit().getDuration().toMillis(); + roundedDate = DateUtils.roundFloor(val, ratio); + return roundedDate; + case "month": + roundedDate = DateUtils.roundMonthOfYear(val); + return roundedDate; + case "year": + roundedDate = DateUtils.roundYear(val); + return roundedDate; + default: + return val; + } + } + + private void serializeStarTree(int numSegmentStarTreeDocument) throws IOException { + // serialize the star tree data + long dataFilePointer = dataOut.getFilePointer(); + long totalStarTreeDataLength = StarTreeBuilderUtils.serializeStarTree(dataOut, rootNode, numStarTreeNodes); + + // serialize the star tree meta + StarTreeBuilderUtils.serializeStarTreeMetadata( + metaOut, + starTreeField, + writeState, + metricAggregatorInfos, + numSegmentStarTreeDocument, + dataFilePointer, + totalStarTreeDataLength + ); + } + + private void createSortedDocValuesIndices(DocValuesConsumer docValuesConsumer, AtomicInteger fieldNumberAcrossStarTrees) + throws IOException { + List dimensionWriters = new ArrayList<>(); + List metricWriters = new ArrayList<>(); + FieldInfo[] dimensionFieldInfoList = new FieldInfo[starTreeField.getDimensionsOrder().size()]; + FieldInfo[] metricFieldInfoList = new FieldInfo[metricAggregatorInfos.size()]; + + for (int i = 0; i < dimensionFieldInfoList.length; i++) { + final FieldInfo fi = new FieldInfo( + fullFieldNameForStarTreeDimensionsDocValues(starTreeField.getName(), starTreeField.getDimensionsOrder().get(i).getField()), + fieldNumberAcrossStarTrees.getAndIncrement(), + false, + false, + true, + IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS, + DocValuesType.SORTED_NUMERIC, + -1, + Collections.emptyMap(), + 0, + 0, + 0, + 0, + VectorEncoding.FLOAT32, + VectorSimilarityFunction.EUCLIDEAN, + false, + false + ); + dimensionFieldInfoList[i] = fi; + dimensionWriters.add(new SortedNumericDocValuesWriterHelper(fi, Counter.newCounter())); + } + for (int i = 0; i < metricAggregatorInfos.size(); i++) { + FieldInfo fi = new FieldInfo( + fullFieldNameForStarTreeMetricsDocValues( + starTreeField.getName(), + metricAggregatorInfos.get(i).getField(), + metricAggregatorInfos.get(i).getMetricStat().getTypeName() + ), + fieldNumberAcrossStarTrees.getAndIncrement(), + false, + false, + true, + IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS, + DocValuesType.SORTED_NUMERIC, + -1, + Collections.emptyMap(), + 0, + 0, + 0, + 0, + VectorEncoding.FLOAT32, + VectorSimilarityFunction.EUCLIDEAN, + false, + false + ); + metricFieldInfoList[i] = fi; + metricWriters.add(new SortedNumericDocValuesWriterHelper(fi, Counter.newCounter())); + } + + for (int docId = 0; docId < numStarTreeDocs; docId++) { + StarTreeDocument starTreeDocument = getStarTreeDocument(docId); + for (int i = 0; i < starTreeDocument.dimensions.length; i++) { + Long val = starTreeDocument.dimensions[i]; + if (val != null) { + dimensionWriters.get(i).addValue(docId, val); + } + } + + for (int i = 0; i < starTreeDocument.metrics.length; i++) { + try { + switch (metricAggregatorInfos.get(i).getValueAggregators().getAggregatedValueType()) { + case LONG: + metricWriters.get(i).addValue(docId, (Long) starTreeDocument.metrics[i]); + break; + case DOUBLE: + metricWriters.get(i).addValue(docId, NumericUtils.doubleToSortableLong((Double) starTreeDocument.metrics[i])); + break; + default: + throw new IllegalStateException("Unknown metric doc value type"); + } + } catch (IllegalArgumentException e) { + logger.info("could not parse the value, exiting creation of star tree"); + } + } + } + + addStarTreeDocValueFields(docValuesConsumer, dimensionWriters, dimensionFieldInfoList, starTreeField.getDimensionsOrder().size()); + addStarTreeDocValueFields(docValuesConsumer, metricWriters, metricFieldInfoList, metricAggregatorInfos.size()); + } + + private void addStarTreeDocValueFields( + DocValuesConsumer docValuesConsumer, + List docValuesWriters, + FieldInfo[] fieldInfoList, + int fieldCount + ) throws IOException { + for (int i = 0; i < fieldCount; i++) { + final int increment = i; + DocValuesProducer docValuesProducer = new EmptyDocValuesProducer() { + @Override + public SortedNumericDocValues getSortedNumeric(FieldInfo field) { + return docValuesWriters.get(increment).getDocValues(); + } + }; + docValuesConsumer.addSortedNumericField(fieldInfoList[i], docValuesProducer); + } + } + /** * Adds a document to the star-tree. * @@ -151,6 +440,9 @@ public List generateMetricAggregatorInfos(MapperService ma */ public abstract StarTreeDocument getStarTreeDocument(int docId) throws IOException; + + public abstract StarTreeDocument getStarTreeDocumentForCreatingDocValues(int docId) throws IOException; + /** * Retrieves the list of star-tree documents in the star-tree. * @@ -171,8 +463,9 @@ public List generateMetricAggregatorInfos(MapperService ma * Sorts and aggregates all the documents in the segment as per the configuration, and returns a star-tree document iterator for all the * aggregated star-tree documents. * + * @param numDocs number of documents in the given segment * @param dimensionReaders List of docValues readers to read dimensions from the segment - * @param metricReaders List of docValues readers to read metrics from the segment + * @param metricReaders List of docValues readers to read metrics from the segment * @return Iterator for the aggregated star-tree document */ public abstract Iterator sortAndAggregateSegmentDocuments( diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilder.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilder.java index f5297de1a5e2b..a2be7a8b1c0c3 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilder.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilder.java @@ -7,8 +7,10 @@ */ package org.opensearch.index.compositeindex.datacube.startree.builder; +import org.apache.lucene.codecs.DocValuesConsumer; import org.apache.lucene.index.SegmentWriteState; import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.store.IndexOutput; import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.index.codec.composite.datacube.startree.StarTreeValues; import org.opensearch.index.compositeindex.datacube.Dimension; @@ -24,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; /** * On heap single tree builder @@ -42,8 +45,14 @@ public class OnHeapStarTreeBuilder extends BaseStarTreeBuilder { * @param segmentWriteState segment write state * @param mapperService helps with the numeric type of field */ - public OnHeapStarTreeBuilder(StarTreeField starTreeField, SegmentWriteState segmentWriteState, MapperService mapperService) { - super(starTreeField, segmentWriteState, mapperService); + public OnHeapStarTreeBuilder( + IndexOutput metaOut, + IndexOutput dataOut, + StarTreeField starTreeField, + SegmentWriteState segmentWriteState, + MapperService mapperService + ) throws IOException { + super(metaOut, dataOut, starTreeField, segmentWriteState, mapperService); } @Override @@ -87,8 +96,12 @@ public Iterator sortAndAggregateSegmentDocuments( } @Override - public void build(List starTreeValuesSubs) throws IOException { - build(mergeStarTrees(starTreeValuesSubs)); + public void build( + List starTreeValuesSubs, + AtomicInteger fieldNumberAcrossStarTrees, + DocValuesConsumer starTreeDocValuesConsumer + ) throws IOException { + build(mergeStarTrees(starTreeValuesSubs), fieldNumberAcrossStarTrees, starTreeDocValuesConsumer); } /** @@ -166,8 +179,46 @@ StarTreeDocument[] mergeStarTreeValues(List starTreeValuesSubs) return starTreeDocuments.toArray(starTreeDocumentsArr); } - Iterator sortAndAggregateStarTreeDocuments(StarTreeDocument[] starTreeDocuments) { - return sortAndAggregateStarTreeDocuments(starTreeDocuments, false); + @Override + public StarTreeDocument getStarTreeDocument(int docId) throws IOException { + return starTreeDocuments.get(docId); + } + + @Override + public StarTreeDocument getStarTreeDocumentForCreatingDocValues(int docId) throws IOException { + return starTreeDocuments.get(docId); + } + + @Override + public List getStarTreeDocuments() { + return starTreeDocuments; + } + + @Override + public Long getDimensionValue(int docId, int dimensionId) throws IOException { + return starTreeDocuments.get(docId).dimensions[dimensionId]; + } + + /** + * Sorts and aggregates all the documents of the segment based on dimension and metrics configuration + * + * @param numDocs number of documents in the given segment + * @param dimensionReaders List of docValues readers to read dimensions from the segment + * @param metricReaders List of docValues readers to read metrics from the segment + * @return Iterator of star-tree documents + * + */ + @Override + public Iterator sortAndAggregateSegmentDocuments( + int numDocs, + SequentialDocValuesIterator[] dimensionReaders, + List metricReaders + ) throws IOException { + StarTreeDocument[] starTreeDocuments = new StarTreeDocument[numDocs]; + for (int currentDocId = 0; currentDocId < numDocs; currentDocId++) { + starTreeDocuments[currentDocId] = getSegmentStarTreeDocument(currentDocId, dimensionReaders, metricReaders); + } + return sortAndAggregateStarTreeDocuments(starTreeDocuments); } /** diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreeBuilder.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreeBuilder.java index 94c9c9f2efb18..7e4159d035113 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreeBuilder.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreeBuilder.java @@ -8,6 +8,7 @@ package org.opensearch.index.compositeindex.datacube.startree.builder; +import org.apache.lucene.codecs.DocValuesConsumer; import org.apache.lucene.codecs.DocValuesProducer; import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.index.codec.composite.datacube.startree.StarTreeValues; @@ -16,6 +17,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; /** * A star-tree builder that builds a single star-tree. @@ -28,16 +30,28 @@ public interface StarTreeBuilder extends Closeable { * Builds the star tree from the original segment documents * * @param fieldProducerMap contains the docValues producer to get docValues associated with each field + * @param fieldNumberAcrossStarTrees maintains the unique field number across the fields in the star tree + * @param starTreeDocValuesConsumer * @throws IOException when we are unable to build star-tree */ - void build(Map fieldProducerMap) throws IOException; + void build( + Map fieldProducerMap, + AtomicInteger fieldNumberAcrossStarTrees, + DocValuesConsumer starTreeDocValuesConsumer + ) throws IOException; /** * Builds the star tree using StarTree values from multiple segments * * @param starTreeValuesSubs contains the star tree values from multiple segments + * @param fieldNumberAcrossStarTrees maintains the unique field number across the fields in the star tree + * @param starTreeDocValuesConsumer * @throws IOException when we are unable to build star-tree */ - void build(List starTreeValuesSubs) throws IOException; + void build( + List starTreeValuesSubs, + AtomicInteger fieldNumberAcrossStarTrees, + DocValuesConsumer starTreeDocValuesConsumer + ) throws IOException; } diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreesBuilder.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreesBuilder.java index 3cf8c665028a5..de489de3d616a 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreesBuilder.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreesBuilder.java @@ -10,8 +10,10 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.codecs.DocValuesConsumer; import org.apache.lucene.codecs.DocValuesProducer; import org.apache.lucene.index.SegmentWriteState; +import org.apache.lucene.store.IndexOutput; import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.index.codec.composite.datacube.startree.StarTreeValues; import org.opensearch.index.compositeindex.datacube.startree.StarTreeField; @@ -25,6 +27,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; /** * Builder to construct star-trees based on multiple star-tree fields. @@ -39,6 +42,7 @@ public class StarTreesBuilder implements Closeable { private final List starTreeFields; private final SegmentWriteState state; private final MapperService mapperService; + private AtomicInteger fieldNumberAcrossStarTrees; public StarTreesBuilder(SegmentWriteState segmentWriteState, MapperService mapperService) { List starTreeFields = new ArrayList<>(); @@ -58,12 +62,18 @@ public StarTreesBuilder(SegmentWriteState segmentWriteState, MapperService mappe this.starTreeFields = starTreeFields; this.state = segmentWriteState; this.mapperService = mapperService; + this.fieldNumberAcrossStarTrees = new AtomicInteger(); } /** * Builds the star-trees. */ - public void build(Map fieldProducerMap) throws IOException { + public void build( + IndexOutput metaOut, + IndexOutput dataOut, + Map fieldProducerMap, + DocValuesConsumer starTreeDocValuesConsumer + ) throws IOException { if (starTreeFields.isEmpty()) { logger.debug("no star-tree fields found, returning from star-tree builder"); return; @@ -76,8 +86,8 @@ public void build(Map fieldProducerMap) throws IOExce // Build all star-trees for (int i = 0; i < numStarTrees; i++) { StarTreeField starTreeField = starTreeFields.get(i); - try (StarTreeBuilder starTreeBuilder = getSingleTreeBuilder(starTreeField, state, mapperService)) { - starTreeBuilder.build(fieldProducerMap); + try (StarTreeBuilder starTreeBuilder = getSingleTreeBuilder(metaOut, dataOut, starTreeField, state, mapperService)) { + starTreeBuilder.build(fieldProducerMap, fieldNumberAcrossStarTrees, starTreeDocValuesConsumer); } } logger.debug("Took {} ms to build {} star-trees with star-tree fields", System.currentTimeMillis() - startTime, numStarTrees); @@ -91,18 +101,25 @@ public void close() throws IOException { /** * Merges star tree fields from multiple segments * + * @param metaOut + * @param dataOut * @param starTreeFieldMap StarTreeField configuration per field * @param starTreeValuesSubsPerField starTreeValuesSubs per field + * @param starTreeDocValuesConsumer */ public void buildDuringMerge( + IndexOutput metaOut, + IndexOutput dataOut, final Map starTreeFieldMap, - final Map> starTreeValuesSubsPerField + final Map> starTreeValuesSubsPerField, + DocValuesConsumer starTreeDocValuesConsumer ) throws IOException { for (Map.Entry> entry : starTreeValuesSubsPerField.entrySet()) { List starTreeValuesList = entry.getValue(); StarTreeField starTreeField = starTreeFieldMap.get(entry.getKey()); - StarTreeBuilder builder = getSingleTreeBuilder(starTreeField, state, mapperService); - builder.build(starTreeValuesList); + StarTreeBuilder builder = getSingleTreeBuilder(metaOut, dataOut, starTreeField, state, mapperService); + builder.build(starTreeValuesList, fieldNumberAcrossStarTrees, starTreeDocValuesConsumer); + builder.close(); builder.close(); } } @@ -110,11 +127,16 @@ public void buildDuringMerge( /** * Get star-tree builder based on build mode. */ - StarTreeBuilder getSingleTreeBuilder(StarTreeField starTreeField, SegmentWriteState state, MapperService mapperService) - throws IOException { + StarTreeBuilder getSingleTreeBuilder( + IndexOutput metaOut, + IndexOutput dataOut, + StarTreeField starTreeField, + SegmentWriteState state, + MapperService mapperService + ) throws IOException { switch (starTreeField.getStarTreeConfig().getBuildMode()) { case ON_HEAP: - return new OnHeapStarTreeBuilder(starTreeField, state, mapperService); + return new OnHeapStarTreeBuilder(metaOut, dataOut, starTreeField, state, mapperService); case OFF_HEAP: // TODO // return new OffHeapStarTreeBuilder(starTreeField, state, mapperService); diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/meta/StarTreeMetadata.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/meta/StarTreeMetadata.java new file mode 100644 index 0000000000000..e328cb061e3f1 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/meta/StarTreeMetadata.java @@ -0,0 +1,228 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.index.compositeindex.datacube.startree.meta; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.store.IndexInput; +import org.opensearch.index.compositeindex.datacube.MetricStat; +import org.opensearch.index.compositeindex.datacube.startree.StarTreeFieldConfiguration; +import org.opensearch.index.compositeindex.datacube.startree.aggregators.MetricEntry; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * Holds the associated metadata for the building of star-tree + * + * @opensearch.experimental + */ +public class StarTreeMetadata implements TreeMetadata { + private static final Logger logger = LogManager.getLogger(TreeMetadata.class); + private final IndexInput meta; + private final String starTreeFieldName; + private final String starTreeFieldType; + private final List dimensionFieldNumbers; + private final List metricEntries; + private final Integer segmentAggregatedDocCount; + private final Integer maxLeafDocs; + private final Set skipStarNodeCreationInDims; + private final StarTreeFieldConfiguration.StarTreeBuildMode starTreeBuildMode; + private final long dataStartFilePointer; + private final long dataLength; + + public StarTreeMetadata(IndexInput meta, String compositeFieldName, String compositeFieldType) throws IOException { + this.meta = meta; + try { + this.starTreeFieldName = compositeFieldName; + this.starTreeFieldType = compositeFieldType; + this.dimensionFieldNumbers = readStarTreeDimensions(); + this.metricEntries = readMetricEntries(); + this.segmentAggregatedDocCount = readSegmentAggregatedDocCount(); + this.maxLeafDocs = readMaxLeafDocs(); + this.skipStarNodeCreationInDims = readSkipStarNodeCreationInDims(); + this.starTreeBuildMode = readBuildMode(); + this.dataStartFilePointer = readDataStartFilePointer(); + this.dataLength = readDataLength(); + } catch (Exception e) { + logger.error("Unable to read star-tree metadata from the file"); + throw new CorruptIndexException("Unable to read star-tree metadata from the file", meta); + } + } + + @Override + public int readDimensionsCount() throws IOException { + return meta.readInt(); + } + + @Override + public List readStarTreeDimensions() throws IOException { + int dimensionCount = readDimensionsCount(); + List dimensionFieldNumbers = new ArrayList<>(); + + for (int i = 0; i < dimensionCount; i++) { + dimensionFieldNumbers.add(meta.readInt()); + } + + return dimensionFieldNumbers; + } + + @Override + public int readMetricsCount() throws IOException { + return meta.readInt(); + } + + @Override + public List readMetricEntries() throws IOException { + int metricCount = readMetricsCount(); + List metricEntries = new ArrayList<>(); + + for (int i = 0; i < metricCount; i++) { + String metricName = meta.readString(); + String metricStat = meta.readString(); + metricEntries.add(new MetricEntry(metricName, MetricStat.fromTypeName(metricStat))); + } + + return metricEntries; + } + + @Override + public int readSegmentAggregatedDocCount() throws IOException { + return meta.readInt(); + } + + @Override + public int readMaxLeafDocs() throws IOException { + return meta.readInt(); + } + + @Override + public int readSkipStarNodeCreationInDimsCount() throws IOException { + return meta.readInt(); + } + + @Override + public Set readSkipStarNodeCreationInDims() throws IOException { + + int skipStarNodeCreationInDimsCount = readSkipStarNodeCreationInDimsCount(); + Set skipStarNodeCreationInDims = new HashSet<>(); + for (int i = 0; i < skipStarNodeCreationInDimsCount; i++) { + skipStarNodeCreationInDims.add(meta.readInt()); + } + return skipStarNodeCreationInDims; + } + + @Override + public StarTreeFieldConfiguration.StarTreeBuildMode readBuildMode() throws IOException { + return StarTreeFieldConfiguration.StarTreeBuildMode.fromTypeName(meta.readString()); + } + + @Override + public long readDataStartFilePointer() throws IOException { + return meta.readLong(); + } + + @Override + public long readDataLength() throws IOException { + return meta.readLong(); + } + + /** + * Returns the name of the star-tree field. + * + * @return star-tree field name + */ + public String getStarTreeFieldName() { + return starTreeFieldName; + } + + /** + * Returns the type of the star tree field. + * + * @return star-tree field type + */ + public String getStarTreeFieldType() { + return starTreeFieldType; + } + + /** + * Returns the list of dimension field numbers. + * + * @return star-tree dimension field numbers + */ + public List getDimensionFieldNumbers() { + return dimensionFieldNumbers; + } + + /** + * Returns the list of metric entries. + * + * @return star-tree metric entries + */ + public List getMetricEntries() { + return metricEntries; + } + + /** + * Returns the aggregated document count for the star-tree. + * + * @return the aggregated document count for the star-tree. + */ + public Integer getSegmentAggregatedDocCount() { + return segmentAggregatedDocCount; + } + + /** + * Returns the max leaf docs for the star-tree. + * + * @return the max leaf docs. + */ + public Integer getMaxLeafDocs() { + return maxLeafDocs; + } + + /** + * Returns the set of dimensions for which star node will not be created in the star-tree. + * + * @return the set of dimensions. + */ + public Set getSkipStarNodeCreationInDims() { + return skipStarNodeCreationInDims; + } + + /** + * Returns the build mode for the star-tree. + * + * @return the star-tree build mode. + */ + public StarTreeFieldConfiguration.StarTreeBuildMode getStarTreeBuildMode() { + return starTreeBuildMode; + } + + /** + * Returns the file pointer to the start of the star-tree data. + * + * @return start file pointer for star-tree data + */ + public long getDataStartFilePointer() { + return dataStartFilePointer; + } + + /** + * Returns the length of star-tree data + * + * @return star-tree length + */ + public long getDataLength() { + return dataLength; + } +} diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/meta/TreeMetadata.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/meta/TreeMetadata.java new file mode 100644 index 0000000000000..9859afad95a74 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/meta/TreeMetadata.java @@ -0,0 +1,112 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.compositeindex.datacube.startree.meta; + +import org.opensearch.index.compositeindex.datacube.startree.StarTreeFieldConfiguration; +import org.opensearch.index.compositeindex.datacube.startree.aggregators.MetricEntry; + +import java.io.IOException; +import java.util.List; +import java.util.Set; + +/** + * An interface for metadata of the star-tree + * + * @opensearch.experimental + */ +public interface TreeMetadata { + + /** + * Reads the count of dimensions in the star-tree. + * + * @return the count of dimensions + * @throws IOException if an I/O error occurs while reading the dimensions count + */ + int readDimensionsCount() throws IOException; + + /** + * Reads the list of dimension ordinals in the star-tree. + * + * @return the list of dimension ordinals + * @throws IOException if an I/O error occurs while reading the dimension ordinals + */ + List readStarTreeDimensions() throws IOException; + + /** + * Reads the count of metrics in the star-tree. + * + * @return the count of metrics + * @throws IOException if an I/O error occurs while reading the metrics count + */ + int readMetricsCount() throws IOException; + + /** + * Reads the list of metric entries in the star-tree. + * + * @return the list of metric entries + * @throws IOException if an I/O error occurs while reading the metric entries + */ + List readMetricEntries() throws IOException; + + /** + * Reads the aggregated document count for the segment in the star-tree. + * + * @return the aggregated document count for the segment + * @throws IOException if an I/O error occurs while reading the aggregated document count + */ + int readSegmentAggregatedDocCount() throws IOException; + + /** + * Reads the max leaf docs for the star-tree. + * + * @return the max leaf docs for the star-tree + * @throws IOException if an I/O error occurs while reading the max leaf docs + */ + int readMaxLeafDocs() throws IOException; + + /** + * Reads the count of dimensions where star node will not be created in the star-tree. + * + * @return the count of dimensions + * @throws IOException if an I/O error occurs while reading the skip star node dimensions count + */ + int readSkipStarNodeCreationInDimsCount() throws IOException; + + /** + * Reads the list of dimensions field numbers to be skipped for star node creation in the star-tree. + * + * @return the set of dimensions field numbers to be skipped for star node creation. + * @throws IOException if an I/O error occurs while reading the dimensions + */ + Set readSkipStarNodeCreationInDims() throws IOException; + + /** + * Reads the build mode for the star-tree. + * + * @return the star-tree build mode + * @throws IOException if an I/O error occurs while reading the build mode + */ + StarTreeFieldConfiguration.StarTreeBuildMode readBuildMode() throws IOException; + + /** + * Reads the file pointer to the start of the star-tree data. + * + * @return the file pointer to the start of the star-tree data + * @throws IOException if an I/O error occurs while reading the star-tree data start file pointer + */ + long readDataStartFilePointer() throws IOException; + + /** + * Reads the length of the data of the star-tree. + * + * @return the length of the data + * @throws IOException if an I/O error occurs while reading the data length + */ + long readDataLength() throws IOException; +} diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/meta/package-info.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/meta/package-info.java new file mode 100644 index 0000000000000..568cacea4b59a --- /dev/null +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/meta/package-info.java @@ -0,0 +1,14 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Meta package for star tree + * + * @opensearch.experimental + */ +package org.opensearch.index.compositeindex.datacube.startree.meta; diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/OffHeapStarTree.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/OffHeapStarTree.java new file mode 100644 index 0000000000000..cfc280643d202 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/OffHeapStarTree.java @@ -0,0 +1,65 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.index.compositeindex.datacube.startree.node; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.RandomAccessInput; +import org.opensearch.index.compositeindex.datacube.startree.meta.StarTreeMetadata; + +import java.io.IOException; + +import static org.opensearch.index.compositeindex.CompositeIndexConstants.MAGIC_MARKER; +import static org.opensearch.index.compositeindex.CompositeIndexConstants.VERSION; + +/** + * Off heap implementation of the star-tree. + * + * @opensearch.experimental + */ +public class OffHeapStarTree implements StarTree { + private static final Logger logger = LogManager.getLogger(OffHeapStarTree.class); + private final OffHeapStarTreeNode root; + private final Integer numNodes; + + public OffHeapStarTree(IndexInput data, StarTreeMetadata starTreeMetadata) throws IOException { + long magicMarker = data.readLong(); + if (MAGIC_MARKER != magicMarker) { + logger.error("Invalid magic marker"); + throw new IOException("Invalid magic marker"); + } + int version = data.readInt(); + if (VERSION != version) { + logger.error("Invalid star tree version"); + throw new IOException("Invalid version"); + } + numNodes = data.readInt(); // num nodes + + RandomAccessInput in = data.randomAccessSlice( + data.getFilePointer(), + starTreeMetadata.getDataLength() + ); + root = new OffHeapStarTreeNode(in, 0); + } + + @Override + public StarTreeNode getRoot() { + return root; + } + + /** + * Returns the number of nodes in star-tree + * + * @return number of nodes in te star-tree + */ + public Integer getNumNodes() { + return numNodes; + } + +} diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/OffHeapStarTreeNode.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/OffHeapStarTreeNode.java new file mode 100644 index 0000000000000..ec30f14f044d4 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/OffHeapStarTreeNode.java @@ -0,0 +1,184 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.index.compositeindex.datacube.startree.node; + +import org.apache.lucene.store.RandomAccessInput; + +import java.io.IOException; +import java.util.Iterator; + +/** + * Off heap implementation of {@link StarTreeNode} + * + * @opensearch.experimental + */ +public class OffHeapStarTreeNode implements StarTreeNode { + public static final int NUM_INT_SERIALIZABLE_FIELDS = 6; + public static final int NUM_LONG_SERIALIZABLE_FIELDS = 1; + public static final int NUM_BYTE_SERIALIZABLE_FIELDS = 1; + public static final long SERIALIZABLE_DATA_SIZE_IN_BYTES = (Integer.BYTES * NUM_INT_SERIALIZABLE_FIELDS) + (Long.BYTES + * NUM_LONG_SERIALIZABLE_FIELDS) + (NUM_BYTE_SERIALIZABLE_FIELDS * Byte.BYTES); + private static final int DIMENSION_ID_OFFSET = 0; + private static final int DIMENSION_VALUE_OFFSET = DIMENSION_ID_OFFSET + Integer.BYTES; + private static final int START_DOC_ID_OFFSET = DIMENSION_VALUE_OFFSET + Long.BYTES; + private static final int END_DOC_ID_OFFSET = START_DOC_ID_OFFSET + Integer.BYTES; + private static final int AGGREGATE_DOC_ID_OFFSET = END_DOC_ID_OFFSET + Integer.BYTES; + private static final int IS_STAR_NODE_OFFSET = AGGREGATE_DOC_ID_OFFSET + Byte.BYTES; + private static final int FIRST_CHILD_ID_OFFSET = IS_STAR_NODE_OFFSET + Integer.BYTES; + private static final int LAST_CHILD_ID_OFFSET = FIRST_CHILD_ID_OFFSET + Integer.BYTES; + + public static final int INVALID_ID = -1; + + private final int nodeId; + private final int firstChildId; + + RandomAccessInput in; + + public OffHeapStarTreeNode(RandomAccessInput in, int nodeId) throws IOException { + this.in = in; + this.nodeId = nodeId; + firstChildId = getInt(FIRST_CHILD_ID_OFFSET); + } + + private int getInt(int fieldOffset) throws IOException { + return in.readInt(nodeId * SERIALIZABLE_DATA_SIZE_IN_BYTES + fieldOffset); + } + + private long getLong(int fieldOffset) throws IOException { + return in.readLong(nodeId * SERIALIZABLE_DATA_SIZE_IN_BYTES + fieldOffset); + } + + private byte getByte(int fieldOffset) throws IOException { + return in.readByte(nodeId * SERIALIZABLE_DATA_SIZE_IN_BYTES + fieldOffset); + } + + @Override + public int getDimensionId() throws IOException { + return getInt(DIMENSION_ID_OFFSET); + } + + @Override + public long getDimensionValue() throws IOException { + return getLong(DIMENSION_VALUE_OFFSET); + } + + @Override + public int getChildDimensionId() throws IOException { + if (firstChildId == INVALID_ID) { + return INVALID_ID; + } else { + return in.readInt(firstChildId * SERIALIZABLE_DATA_SIZE_IN_BYTES); + } + } + + @Override + public int getStartDocId() throws IOException { + return getInt(START_DOC_ID_OFFSET); + } + + @Override + public int getEndDocId() throws IOException { + return getInt(END_DOC_ID_OFFSET); + } + + @Override + public int getAggregatedDocId() throws IOException { + return getInt(AGGREGATE_DOC_ID_OFFSET); + } + + @Override + public int getNumChildren() throws IOException { + if (firstChildId == INVALID_ID) { + return 0; + } else { + return getInt(LAST_CHILD_ID_OFFSET) - firstChildId + 1; + } + } + + @Override + public boolean isLeaf() { + return firstChildId == INVALID_ID; + } + + @Override + public boolean isStarNode() throws IOException { + return getByte(IS_STAR_NODE_OFFSET) != 0; + } + + @Override + public StarTreeNode getChildForDimensionValue(long dimensionValue) throws IOException { + // there will be no children for leaf nodes + if (isLeaf()) { + return null; + } + + // Specialize star node for performance + if (dimensionValue == ALL) { + return handleStarNode(); + } + + return binarySearchChild(dimensionValue); + } + + private OffHeapStarTreeNode handleStarNode() throws IOException { + OffHeapStarTreeNode firstNode = new OffHeapStarTreeNode(in, firstChildId); + if (firstNode.getDimensionValue() == ALL) { + return firstNode; + } else { + return null; + } + } + + private OffHeapStarTreeNode binarySearchChild(long dimensionValue) throws IOException { + // Binary search to find child node + int low = firstChildId; + int high = getInt(LAST_CHILD_ID_OFFSET); + + while (low <= high) { + int mid = low + (high - low) / 2; + OffHeapStarTreeNode midNode = new OffHeapStarTreeNode(in, mid); + long midNodeDimensionValue = midNode.getDimensionValue(); + + if (midNodeDimensionValue == dimensionValue) { + return midNode; + } else if (midNodeDimensionValue < dimensionValue) { + low = mid + 1; + } else { + high = mid - 1; + } + } + return null; + } + + @Override + public Iterator getChildrenIterator() throws IOException { + return new Iterator<>() { + private int currentChildId = firstChildId; + private final int lastChildId = getInt(LAST_CHILD_ID_OFFSET); + + @Override + public boolean hasNext() { + return currentChildId <= lastChildId; + } + + @Override + public OffHeapStarTreeNode next() { + try { + return new OffHeapStarTreeNode(in, currentChildId++); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } +} diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/StarTree.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/StarTree.java new file mode 100644 index 0000000000000..e21dc225a4c8f --- /dev/null +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/StarTree.java @@ -0,0 +1,23 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.index.compositeindex.datacube.startree.node; + +/** + * Interface for star-tree. + * + * @opensearch.experimental + */ +public interface StarTree { + + /** + * Fetches the root node of the star-tree. + * @return the root of the star-tree + */ + StarTreeNode getRoot(); + +} diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/package-info.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/package-info.java index 516d5b5a012ab..19d12bc6318d7 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/package-info.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/package-info.java @@ -8,5 +8,7 @@ /** * Holds classes associated with star tree node + * + * @opensearch.experimental */ package org.opensearch.index.compositeindex.datacube.startree.node; diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeBuilderUtils.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeBuilderUtils.java new file mode 100644 index 0000000000000..cb0fc100db546 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeBuilderUtils.java @@ -0,0 +1,57 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.index.compositeindex.datacube.startree.utils; + +import org.apache.lucene.index.SegmentWriteState; +import org.apache.lucene.store.IndexOutput; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.index.compositeindex.datacube.startree.StarTreeField; +import org.opensearch.index.compositeindex.datacube.startree.aggregators.MetricAggregatorInfo; +import org.opensearch.index.mapper.CompositeMappedFieldType; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + * Util class for building star tree + * + * @opensearch.experimental + */ +public class StarTreeBuilderUtils { + + private StarTreeBuilderUtils() {} + + public static final int ALL = -1; + + public static long serializeStarTree(IndexOutput dataOut, TreeNode rootNode, int numNodes) throws IOException { + return StarTreeDataSerializer.serializeStarTree(dataOut, rootNode, numNodes); + } + + public static void serializeStarTreeMetadata( + IndexOutput metaOut, + StarTreeField starTreeField, + SegmentWriteState writeState, + List metricAggregatorInfos, + Integer segmentAggregatedCount, + long dataFilePointer, + long dataFileLength + ) throws IOException { + StarTreeMetaSerializer.serializeStarTreeMetadata( + metaOut, + CompositeMappedFieldType.CompositeFieldType.STAR_TREE, + starTreeField, + writeState, + metricAggregatorInfos, + segmentAggregatedCount, + dataFilePointer, + dataFileLength + ); + } + +} diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeDataSerializer.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeDataSerializer.java new file mode 100644 index 0000000000000..54059458a6d0d --- /dev/null +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeDataSerializer.java @@ -0,0 +1,138 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.compositeindex.datacube.startree.utils; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.IndexOutput; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; + +import static org.opensearch.index.compositeindex.CompositeIndexConstants.MAGIC_MARKER; +import static org.opensearch.index.compositeindex.CompositeIndexConstants.VERSION; +import static org.opensearch.index.compositeindex.datacube.startree.node.OffHeapStarTreeNode.SERIALIZABLE_DATA_SIZE_IN_BYTES; +import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeBuilderUtils.ALL; + +/** + * Utility class for serializing a star-tree data structure. + * + * @opensearch.experimental + */ +public class StarTreeDataSerializer { + + private static final Logger logger = LogManager.getLogger(StarTreeDataSerializer.class); + + /** + * Serializes the star-tree data structure. + * + * @param indexOutput the IndexOutput to write the star-tree data + * @param rootNode the root node of the star-tree + * @param numNodes the total number of nodes in the star-tree + * @return the total size in bytes of the serialized star-tree data + * @throws IOException if an I/O error occurs while writing the star-tree data + */ + public static long serializeStarTree(IndexOutput indexOutput, StarTreeBuilderUtils.TreeNode rootNode, int numNodes) throws IOException { + int headerSizeInBytes = computeStarTreeDataHeaderByteSize(); + long totalSizeInBytes = headerSizeInBytes + (long) numNodes * SERIALIZABLE_DATA_SIZE_IN_BYTES; + + logger.info("Star tree size in bytes : {}", totalSizeInBytes); + + writeStarTreeHeader(indexOutput, numNodes); + writeStarTreeNodes(indexOutput, rootNode); + return totalSizeInBytes; + } + + /** + * Computes the byte size of the star-tree data header. + * + * @return the byte size of the star-tree data header + */ + private static int computeStarTreeDataHeaderByteSize() { + // Magic marker (8), version (4) + int headerSizeInBytes = 12; + + // For number of nodes. + headerSizeInBytes += Integer.BYTES; + return headerSizeInBytes; + } + + /** + * Writes the star-tree data header. + * + * @param output the IndexOutput to write the header + * @param numNodes the total number of nodes in the star-tree + * @throws IOException if an I/O error occurs while writing the header + */ + private static void writeStarTreeHeader(IndexOutput output, int numNodes) throws IOException { + output.writeLong(MAGIC_MARKER); + output.writeInt(VERSION); + output.writeInt(numNodes); + } + + /** + * Writes the star-tree nodes in a breadth-first order. + * + * @param output the IndexOutput to write the nodes + * @param rootNode the root node of the star-tree + * @throws IOException if an I/O error occurs while writing the nodes + */ + private static void writeStarTreeNodes(IndexOutput output, StarTreeBuilderUtils.TreeNode rootNode) throws IOException { + Queue queue = new LinkedList<>(); + queue.add(rootNode); + + int currentNodeId = 0; + while (!queue.isEmpty()) { + StarTreeBuilderUtils.TreeNode node = queue.remove(); + + if (node.children == null) { + writeStarTreeNode(output, node, ALL, ALL); + } else { + + // Sort all children nodes based on dimension value + List sortedChildren = new ArrayList<>(node.children.values()); + sortedChildren.sort(Comparator.comparingLong(o -> o.dimensionValue)); + + int firstChildId = currentNodeId + queue.size() + 1; + int lastChildId = firstChildId + sortedChildren.size() - 1; + writeStarTreeNode(output, node, firstChildId, lastChildId); + + queue.addAll(sortedChildren); + } + + currentNodeId++; + } + } + + /** + * Writes a single star-tree node + * + * @param output the IndexOutput to write the node + * @param node the star tree node to write + * @param firstChildId the ID of the first child node + * @param lastChildId the ID of the last child node + * @throws IOException if an I/O error occurs while writing the node + */ + private static void writeStarTreeNode(IndexOutput output, StarTreeBuilderUtils.TreeNode node, int firstChildId, int lastChildId) + throws IOException { + output.writeInt(node.dimensionId); + output.writeLong(node.dimensionValue); + output.writeInt(node.startDocId); + output.writeInt(node.endDocId); + output.writeInt(node.aggregatedDocId); + output.writeByte(node.isStarNode == false ? (byte) 0 : (byte) 1); + output.writeInt(firstChildId); + output.writeInt(lastChildId); + } + +} diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeHelper.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeHelper.java new file mode 100644 index 0000000000000..8cbb7508ee1ef --- /dev/null +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeHelper.java @@ -0,0 +1,38 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.compositeindex.datacube.startree.utils; + +import org.opensearch.index.compositeindex.datacube.startree.aggregators.MetricAggregatorInfo; + +/** + * This class contains helper methods used throughout the Star Tree index implementation. + * + * @opensearch.experimental + */ +public class StarTreeHelper { + + /** + * The suffix appended to dimension field names in the Star Tree index. + */ + public static final String DIMENSION_SUFFIX = "dim"; + + /** + * The suffix appended to metric field names in the Star Tree index. + */ + public static final String METRIC_SUFFIX = "metric"; + + public static String fullFieldNameForStarTreeDimensionsDocValues(String starTreeFieldName, String dimensionName) { + return starTreeFieldName + "_" + dimensionName + "_" + DIMENSION_SUFFIX; + } + + public static String fullFieldNameForStarTreeMetricsDocValues(String name, String fieldName, String metricName) { + return MetricAggregatorInfo.toFieldName(name, fieldName, metricName) + "_" + METRIC_SUFFIX; + } + +} diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeMetaSerializer.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeMetaSerializer.java new file mode 100644 index 0000000000000..e1091ba35d3f8 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeMetaSerializer.java @@ -0,0 +1,223 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.compositeindex.datacube.startree.utils; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.index.SegmentWriteState; +import org.apache.lucene.store.IndexOutput; +import org.opensearch.index.compositeindex.datacube.Dimension; +import org.opensearch.index.compositeindex.datacube.startree.StarTreeField; +import org.opensearch.index.compositeindex.datacube.startree.aggregators.MetricAggregatorInfo; +import org.opensearch.index.mapper.CompositeMappedFieldType; + +import java.io.IOException; +import java.util.List; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.opensearch.index.compositeindex.CompositeIndexConstants.MAGIC_MARKER; +import static org.opensearch.index.compositeindex.CompositeIndexConstants.VERSION; + +/** + * The utility class for serializing the metadata of a star-tree data structure. + * The metadata includes information about the dimensions, metrics, and other relevant details + * related to the star tree. + * + * @opensearch.experimental + */ +public class StarTreeMetaSerializer { + + private static final Logger logger = LogManager.getLogger(StarTreeMetaSerializer.class); + + /** + * Serializes the star-tree metadata. + * + * @param metaOut the IndexOutput to write the metadata + * @param compositeFieldType the composite field type of the star-tree field + * @param starTreeField the star-tree field + * @param writeState the segment write state + * @param metricAggregatorInfos the list of metric aggregator information + * @param segmentAggregatedCount the aggregated document count for the segment + * @param dataFilePointer the file pointer to the start of the star tree data + * @param dataFileLength the length of the star tree data file + * @throws IOException if an I/O error occurs while serializing the metadata + */ + public static void serializeStarTreeMetadata( + IndexOutput metaOut, + CompositeMappedFieldType.CompositeFieldType compositeFieldType, + StarTreeField starTreeField, + SegmentWriteState writeState, + List metricAggregatorInfos, + Integer segmentAggregatedCount, + long dataFilePointer, + long dataFileLength + ) throws IOException { + long totalSizeInBytes = 0; + + // header size + totalSizeInBytes += computeHeaderByteSize(compositeFieldType, starTreeField.getName()); + // number of dimensions + totalSizeInBytes += Integer.BYTES; + // dimension field numbers + totalSizeInBytes += (long) starTreeField.getDimensionsOrder().size() * Integer.BYTES; + // metric count + totalSizeInBytes += Integer.BYTES; + // metric - metric stat pair + totalSizeInBytes += computeMetricEntriesSizeInBytes(metricAggregatorInfos); + // segment aggregated document count + totalSizeInBytes += Integer.BYTES; + // max leaf docs + totalSizeInBytes += Integer.BYTES; + // skip star node creation dimensions count + totalSizeInBytes += Integer.BYTES; + // skip star node creation dimensions field numbers + totalSizeInBytes += (long) starTreeField.getStarTreeConfig().getSkipStarNodeCreationInDims().size() * Integer.BYTES; + // data start file pointer + totalSizeInBytes += Long.BYTES; + // data length + totalSizeInBytes += Long.BYTES; + + logger.info("Star tree size in bytes : {}", totalSizeInBytes); + + writeMetaHeader(metaOut, compositeFieldType, starTreeField.getName()); + writeMeta(metaOut, writeState, metricAggregatorInfos, starTreeField, segmentAggregatedCount, dataFilePointer, dataFileLength); + } + + /** + * Computes the byte size required to store the star-tree metric entry. + * + * @param metricAggregatorInfos the list of metric aggregator information + * @return the byte size required to store the metric-metric stat pairs + */ + private static long computeMetricEntriesSizeInBytes(List metricAggregatorInfos) { + + long totalMetricEntriesSize = 0; + + for (MetricAggregatorInfo metricAggregatorInfo : metricAggregatorInfos) { + totalMetricEntriesSize += metricAggregatorInfo.getField().getBytes(UTF_8).length; + totalMetricEntriesSize += metricAggregatorInfo.getMetricStat().getTypeName().getBytes(UTF_8).length; + } + + return totalMetricEntriesSize; + } + + /** + * Computes the byte size of the star-tree metadata header. + * + * @param compositeFieldType the composite field type of the star-tree field + * @param starTreeFieldName the name of the star-tree field + * @return the byte size of the star-tree metadata header + */ + private static int computeHeaderByteSize(CompositeMappedFieldType.CompositeFieldType compositeFieldType, String starTreeFieldName) { + // Magic marker (8), version (4), size of header (4) + int headerSizeInBytes = 16; + + // For star-tree field name + headerSizeInBytes += starTreeFieldName.getBytes(UTF_8).length; + + // For star tree field type + headerSizeInBytes += compositeFieldType.getName().getBytes(UTF_8).length; + + return headerSizeInBytes; + } + + /** + * Writes the star-tree metadata header. + * + * @param metaOut the IndexOutput to write the header + * @param compositeFieldType the composite field type of the star-tree field + * @param starTreeFieldName the name of the star-tree field + * @throws IOException if an I/O error occurs while writing the header + */ + private static void writeMetaHeader( + IndexOutput metaOut, + CompositeMappedFieldType.CompositeFieldType compositeFieldType, + String starTreeFieldName + ) throws IOException { + // magic marker for sanity + metaOut.writeLong(MAGIC_MARKER); + + // version + metaOut.writeInt(VERSION); + + // star tree field name + metaOut.writeString(starTreeFieldName); + + // star tree field type + metaOut.writeString(compositeFieldType.getName()); + } + + /** + * Writes the star-tree metadata. + * + * @param metaOut the IndexOutput to write the metadata + * @param writeState the segment write state + * @param metricAggregatorInfos the list of metric aggregator information + * @param starTreeField the star tree field + * @param segmentAggregatedDocCount the aggregated document count for the segment + * @param dataFilePointer the file pointer to the start of the star-tree data + * @param dataFileLength the length of the star-tree data file + * @throws IOException if an I/O error occurs while writing the metadata + */ + private static void writeMeta( + IndexOutput metaOut, + SegmentWriteState writeState, + List metricAggregatorInfos, + StarTreeField starTreeField, + Integer segmentAggregatedDocCount, + long dataFilePointer, + long dataFileLength + ) throws IOException { + + // number of dimensions + metaOut.writeInt(starTreeField.getDimensionsOrder().size()); + + // dimensions + for (Dimension dimension : starTreeField.getDimensionsOrder()) { + int dimensionFieldNumber = writeState.fieldInfos.fieldInfo(dimension.getField()).getFieldNumber(); + metaOut.writeInt(dimensionFieldNumber); + } + + // number of metrics + metaOut.writeInt(metricAggregatorInfos.size()); + + // metric - metric stat pair + for (MetricAggregatorInfo metricAggregatorInfo : metricAggregatorInfos) { + String metricName = metricAggregatorInfo.getField(); + String metricStatName = metricAggregatorInfo.getMetricStat().getTypeName(); + metaOut.writeString(metricName); + metaOut.writeString(metricStatName); + } + + // segment aggregated document count + metaOut.writeInt(segmentAggregatedDocCount); + + // max leaf docs + metaOut.writeInt(starTreeField.getStarTreeConfig().maxLeafDocs()); + + // number of skip star node creation dimensions + metaOut.writeInt(starTreeField.getStarTreeConfig().getSkipStarNodeCreationInDims().size()); + + // skip star node creations + for (String dimension : starTreeField.getStarTreeConfig().getSkipStarNodeCreationInDims()) { + int dimensionFieldNumber = writeState.fieldInfos.fieldInfo(dimension).getFieldNumber(); + metaOut.writeInt(dimensionFieldNumber); + } + + // star tree build-mode + metaOut.writeString(starTreeField.getStarTreeConfig().getBuildMode().getTypeName()); + + // star-tree data file pointer + metaOut.writeLong(dataFilePointer); + + // star-tree data file length + metaOut.writeLong(dataFileLength); + + } +} diff --git a/server/src/main/java/org/opensearch/index/compositeindex/package-info.java b/server/src/main/java/org/opensearch/index/compositeindex/package-info.java index 59f18efec26b1..9a88f88d9850a 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/package-info.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/package-info.java @@ -8,6 +8,7 @@ /** * Core classes for handling composite indices. - * @opensearch.experimental + * + * @opensearch.experimental */ package org.opensearch.index.compositeindex; diff --git a/server/src/test/java/org/opensearch/index/codec/composite/datacube/startree/StarTreeDocValuesFormatTests.java b/server/src/test/java/org/opensearch/index/codec/composite/datacube/startree/StarTreeDocValuesFormatTests.java index 049d91bc42d9c..9a7ae7525e5db 100644 --- a/server/src/test/java/org/opensearch/index/codec/composite/datacube/startree/StarTreeDocValuesFormatTests.java +++ b/server/src/test/java/org/opensearch/index/codec/composite/datacube/startree/StarTreeDocValuesFormatTests.java @@ -40,6 +40,7 @@ import java.util.Collections; import static org.opensearch.common.util.FeatureFlags.STAR_TREE_INDEX; +import static org.opensearch.test.OpenSearchTestCase.randomFrom; /** * Star tree doc values Lucene tests @@ -76,6 +77,32 @@ protected Codec getCodec() { return codec; } + private StarTreeMapper.StarTreeFieldType getStarTreeFieldType() { + List m1 = new ArrayList<>(); + m1.add(MetricStat.MAX); + Metric metric = new Metric("sndv", m1); + List d1CalendarIntervals = new ArrayList<>(); + d1CalendarIntervals.add(Rounding.DateTimeUnit.HOUR_OF_DAY); + StarTreeField starTreeField = getStarTreeField(d1CalendarIntervals, metric); + + return new StarTreeMapper.StarTreeFieldType("star_tree", starTreeField); + } + + private static StarTreeField getStarTreeField(List d1CalendarIntervals, Metric metric1) { + DateDimension d1 = new DateDimension("field", d1CalendarIntervals); + NumericDimension d2 = new NumericDimension("dv"); + + List metrics = List.of(metric1); + List dims = List.of(d1, d2); + StarTreeFieldConfiguration config = new StarTreeFieldConfiguration( + 100, + Collections.emptySet(), + randomFrom(StarTreeFieldConfiguration.StarTreeBuildMode.ON_HEAP) // TODO : change it + ); + + return new StarTreeField("starTree", dims, metrics, config); + } + public void testStarTreeDocValues() throws IOException { Directory directory = newDirectory(); IndexWriterConfig conf = newIndexWriterConfig(null); diff --git a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilderTests.java b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilderTests.java index 26e2cb8e391f4..6a8ff56d5bb35 100644 --- a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilderTests.java +++ b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilderTests.java @@ -8,20 +8,24 @@ package org.opensearch.index.compositeindex.datacube.startree.builder; +import org.apache.lucene.codecs.DocValuesConsumer; import org.apache.lucene.codecs.DocValuesProducer; import org.apache.lucene.codecs.lucene99.Lucene99Codec; import org.apache.lucene.index.DocValuesType; import org.apache.lucene.index.FieldInfo; import org.apache.lucene.index.FieldInfos; +import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.IndexOptions; import org.apache.lucene.index.SegmentInfo; import org.apache.lucene.index.SegmentWriteState; import org.apache.lucene.index.VectorEncoding; import org.apache.lucene.index.VectorSimilarityFunction; import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IndexOutput; import org.apache.lucene.util.InfoStream; import org.apache.lucene.util.Version; import org.opensearch.common.settings.Settings; +import org.opensearch.index.codec.composite.Composite99DocValuesFormat; import org.opensearch.index.codec.composite.datacube.startree.StarTreeValues; import org.opensearch.index.compositeindex.datacube.Dimension; import org.opensearch.index.compositeindex.datacube.Metric; @@ -51,6 +55,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -75,9 +80,12 @@ public class BaseStarTreeBuilderTests extends OpenSearchTestCase { private static List metrics; private static Directory directory; private static FieldInfo[] fieldsInfo; - private static SegmentWriteState state; + private static SegmentWriteState writeState; private static StarTreeField starTreeField; + private static IndexOutput dataOut; + private static IndexOutput metaOut; + @BeforeClass public static void setup() throws IOException { @@ -138,7 +146,21 @@ public static void setup() throws IOException { fieldProducerMap.put(fields.get(i), docValuesProducer); } FieldInfos fieldInfos = new FieldInfos(fieldsInfo); - state = new SegmentWriteState(InfoStream.getDefault(), segmentInfo.dir, segmentInfo, fieldInfos, null, newIOContext(random())); + writeState = new SegmentWriteState(InfoStream.getDefault(), segmentInfo.dir, segmentInfo, fieldInfos, null, newIOContext(random())); + + String dataFileName = IndexFileNames.segmentFileName( + writeState.segmentInfo.name, + writeState.segmentSuffix, + Composite99DocValuesFormat.DATA_EXTENSION + ); + dataOut = writeState.directory.createOutput(dataFileName, writeState.context); + + String metaFileName = IndexFileNames.segmentFileName( + writeState.segmentInfo.name, + writeState.segmentSuffix, + Composite99DocValuesFormat.META_EXTENSION + ); + metaOut = writeState.directory.createOutput(metaFileName, writeState.context); mapperService = mock(MapperService.class); DocumentMapper documentMapper = mock(DocumentMapper.class); @@ -157,9 +179,13 @@ public static void setup() throws IOException { ); when(documentMapper.mappers()).thenReturn(fieldMappers); - builder = new BaseStarTreeBuilder(starTreeField, state, mapperService) { + builder = new BaseStarTreeBuilder(metaOut, dataOut, starTreeField, writeState, mapperService) { @Override - public void build(List starTreeValuesSubs) throws IOException {} + public void build( + List starTreeValuesSubs, + AtomicInteger fieldNumberAcrossStarTrees, + DocValuesConsumer starTreeDocValuesConsumer + ) throws IOException {} @Override public void appendStarTreeDocument(StarTreeDocument starTreeDocument) throws IOException {} @@ -169,6 +195,11 @@ public StarTreeDocument getStarTreeDocument(int docId) throws IOException { return null; } + @Override + public StarTreeDocument getStarTreeDocumentForCreatingDocValues(int docId) throws IOException { + return null; + } + @Override public List getStarTreeDocuments() { return List.of(); @@ -219,6 +250,8 @@ public void test_reduceStarTreeDocuments() { @Override public void tearDown() throws Exception { super.tearDown(); + dataOut.close(); + metaOut.close(); directory.close(); } } diff --git a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilderTests.java b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilderTests.java index ec0e2ba838730..0d5dc3d9ae77d 100644 --- a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilderTests.java +++ b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilderTests.java @@ -8,11 +8,13 @@ package org.opensearch.index.compositeindex.datacube.startree.builder; +import org.apache.lucene.codecs.DocValuesConsumer; import org.apache.lucene.codecs.DocValuesProducer; import org.apache.lucene.codecs.lucene99.Lucene99Codec; import org.apache.lucene.index.DocValuesType; import org.apache.lucene.index.FieldInfo; import org.apache.lucene.index.FieldInfos; +import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.IndexOptions; import org.apache.lucene.index.SegmentInfo; import org.apache.lucene.index.SegmentWriteState; @@ -22,10 +24,12 @@ import org.apache.lucene.sandbox.document.HalfFloatPoint; import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IndexOutput; import org.apache.lucene.util.InfoStream; import org.apache.lucene.util.NumericUtils; import org.apache.lucene.util.Version; import org.opensearch.common.settings.Settings; +import org.opensearch.index.codec.composite.Composite99DocValuesFormat; import org.opensearch.index.codec.composite.datacube.startree.StarTreeValues; import org.opensearch.index.compositeindex.datacube.Dimension; import org.opensearch.index.compositeindex.datacube.Metric; @@ -54,6 +58,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -70,6 +75,9 @@ public class OnHeapStarTreeBuilderTests extends OpenSearchTestCase { private StarTreeField compositeField; private Map fieldProducerMap; private SegmentWriteState writeState; + private IndexOutput dataOut; + private IndexOutput metaOut; + private DocValuesConsumer docValuesConsumer; @Before public void setup() throws IOException { @@ -90,6 +98,7 @@ public void setup() throws IOException { ); DocValuesProducer docValuesProducer = mock(DocValuesProducer.class); + docValuesConsumer = mock(DocValuesConsumer.class); compositeField = new StarTreeField( "test", @@ -140,6 +149,20 @@ public void setup() throws IOException { FieldInfos fieldInfos = new FieldInfos(fieldsInfo); writeState = new SegmentWriteState(InfoStream.getDefault(), segmentInfo.dir, segmentInfo, fieldInfos, null, newIOContext(random())); + String dataFileName = IndexFileNames.segmentFileName( + writeState.segmentInfo.name, + writeState.segmentSuffix, + Composite99DocValuesFormat.DATA_EXTENSION + ); + dataOut = writeState.directory.createOutput(dataFileName, writeState.context); + + String metaFileName = IndexFileNames.segmentFileName( + writeState.segmentInfo.name, + writeState.segmentSuffix, + Composite99DocValuesFormat.META_EXTENSION + ); + metaOut = writeState.directory.createOutput(metaFileName, writeState.context); + mapperService = mock(MapperService.class); DocumentMapper documentMapper = mock(DocumentMapper.class); when(mapperService.documentMapper()).thenReturn(documentMapper); @@ -162,7 +185,7 @@ public void setup() throws IOException { null ); when(documentMapper.mappers()).thenReturn(fieldMappers); - builder = new OnHeapStarTreeBuilder(compositeField, writeState, mapperService); + builder = new OnHeapStarTreeBuilder(metaOut, dataOut, compositeField, writeState, mapperService); } public void test_sortAndAggregateStarTreeDocuments() throws IOException { @@ -397,7 +420,7 @@ public void test_build_halfFloatMetrics() throws IOException { null ); when(documentMapper.mappers()).thenReturn(fieldMappers); - builder = new OnHeapStarTreeBuilder(compositeField, writeState, mapperService); + builder = new OnHeapStarTreeBuilder(metaOut, dataOut, compositeField, writeState, mapperService); int noOfStarTreeDocuments = 5; StarTreeDocument[] starTreeDocuments = new StarTreeDocument[noOfStarTreeDocuments]; @@ -460,7 +483,7 @@ public void test_build_halfFloatMetrics() throws IOException { } Iterator segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments(segmentStarTreeDocuments); - builder.build(segmentStarTreeDocumentIterator); + builder.build(segmentStarTreeDocumentIterator, new AtomicInteger(), docValuesConsumer); List resultStarTreeDocuments = builder.getStarTreeDocuments(); assertEquals(7, resultStarTreeDocuments.size()); @@ -492,7 +515,7 @@ public void test_build_floatMetrics() throws IOException { null ); when(documentMapper.mappers()).thenReturn(fieldMappers); - builder = new OnHeapStarTreeBuilder(compositeField, writeState, mapperService); + builder = new OnHeapStarTreeBuilder(metaOut, dataOut, compositeField, writeState, mapperService); int noOfStarTreeDocuments = 5; StarTreeDocument[] starTreeDocuments = new StarTreeDocument[noOfStarTreeDocuments]; @@ -525,7 +548,7 @@ public void test_build_floatMetrics() throws IOException { } Iterator segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments(segmentStarTreeDocuments); - builder.build(segmentStarTreeDocumentIterator); + builder.build(segmentStarTreeDocumentIterator, new AtomicInteger(), docValuesConsumer); List resultStarTreeDocuments = builder.getStarTreeDocuments(); assertEquals(7, resultStarTreeDocuments.size()); @@ -558,7 +581,7 @@ public void test_build_longMetrics() throws IOException { null ); when(documentMapper.mappers()).thenReturn(fieldMappers); - builder = new OnHeapStarTreeBuilder(compositeField, writeState, mapperService); + builder = new OnHeapStarTreeBuilder(metaOut, dataOut, compositeField, writeState, mapperService); int noOfStarTreeDocuments = 5; StarTreeDocument[] starTreeDocuments = new StarTreeDocument[noOfStarTreeDocuments]; @@ -579,7 +602,7 @@ public void test_build_longMetrics() throws IOException { } Iterator segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments(segmentStarTreeDocuments); - builder.build(segmentStarTreeDocumentIterator); + builder.build(segmentStarTreeDocumentIterator, new AtomicInteger(), docValuesConsumer); List resultStarTreeDocuments = builder.getStarTreeDocuments(); assertEquals(7, resultStarTreeDocuments.size()); @@ -622,7 +645,7 @@ public void test_build() throws IOException { } Iterator segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments(segmentStarTreeDocuments); - builder.build(segmentStarTreeDocumentIterator); + builder.build(segmentStarTreeDocumentIterator, new AtomicInteger(), docValuesConsumer); List resultStarTreeDocuments = builder.getStarTreeDocuments(); assertEquals(7, resultStarTreeDocuments.size()); @@ -890,8 +913,9 @@ public void testMergeFlow() throws IOException { Map f2dimDocIdSetIterators = Map.of("field1", f2d1sndv, "field3", f2d2sndv); Map f2metricDocIdSetIterators = Map.of("field2", f2m1sndv); StarTreeValues starTreeValues2 = new StarTreeValues(sf, null, f2dimDocIdSetIterators, f2metricDocIdSetIterators); - OnHeapStarTreeBuilder builder = new OnHeapStarTreeBuilder(sf, writeState, mapperService); + OnHeapStarTreeBuilder builder = new OnHeapStarTreeBuilder(metaOut, dataOut, sf, writeState, mapperService); Iterator starTreeDocumentIterator = builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2)); + /** * Asserting following dim / metrics [ dim1, dim2 / Sum [ metric] ] * [0, 0] | [0.0] @@ -1021,6 +1045,8 @@ public long cost() { @Override public void tearDown() throws Exception { super.tearDown(); + dataOut.close(); + metaOut.close(); directory.close(); } }