From db1eb0d5a7a19b3e9be32a014de17e2539badf81 Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Tue, 2 Jul 2024 15:11:10 +0530 Subject: [PATCH] Star tree merge changes Signed-off-by: Bharathwaj G --- .../lucene/index/BaseStarTreeBuilder.java | 257 +++++++++--------- .../composite/Composite90DocValuesReader.java | 6 +- .../composite/Composite90DocValuesWriter.java | 55 +++- .../composite/CompositeIndexFieldInfo.java | 36 +++ .../codec/composite/CompositeIndexReader.java | 5 +- .../datacube/startree/StarTreeValues.java | 40 ++- .../aggregators/MetricAggregatorInfo.java | 18 +- .../builder/OnHeapStarTreeBuilder.java | 131 +++++++-- .../startree/builder/StarTreeBuilder.java | 21 +- .../StarTreeDocValuesIteratorAdapter.java | 82 ------ .../startree/builder/StarTreesBuilder.java | 48 ++-- .../datacube/startree/node/StarTreeNode.java | 57 ++++ .../datacube/startree/node/package-info.java | 12 + .../utils/SequentialDocValuesIterator.java | 43 ++- .../MetricAggregatorInfoTests.java | 34 +-- .../builder/BaseStarTreeBuilderTests.java | 21 +- .../builder/OnHeapStarTreeBuilderTests.java | 137 +++++++++- ...ests.java => SequentialIteratorTests.java} | 48 ++-- 18 files changed, 715 insertions(+), 336 deletions(-) create mode 100644 server/src/main/java/org/opensearch/index/codec/composite/CompositeIndexFieldInfo.java delete mode 100644 server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreeDocValuesIteratorAdapter.java create mode 100644 server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/StarTreeNode.java create mode 100644 server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/package-info.java rename server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/{StarTreeValuesIteratorFactoryTests.java => SequentialIteratorTests.java} (70%) diff --git a/server/src/main/java/org/apache/lucene/index/BaseStarTreeBuilder.java b/server/src/main/java/org/apache/lucene/index/BaseStarTreeBuilder.java index 309bba2b79ea0..658f1ea22e767 100644 --- a/server/src/main/java/org/apache/lucene/index/BaseStarTreeBuilder.java +++ b/server/src/main/java/org/apache/lucene/index/BaseStarTreeBuilder.java @@ -20,7 +20,6 @@ import org.opensearch.index.compositeindex.datacube.startree.aggregators.ValueAggregator; import org.opensearch.index.compositeindex.datacube.startree.aggregators.numerictype.StarTreeNumericType; import org.opensearch.index.compositeindex.datacube.startree.builder.StarTreeBuilder; -import org.opensearch.index.compositeindex.datacube.startree.builder.StarTreeDocValuesIteratorAdapter; import org.opensearch.index.compositeindex.datacube.startree.builder.StarTreesBuilder; import org.opensearch.index.compositeindex.datacube.startree.utils.SequentialDocValuesIterator; import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeBuilderUtils; @@ -52,7 +51,7 @@ public abstract class BaseStarTreeBuilder implements StarTreeBuilder { /** * Default value for star node */ - public static final int STAR_IN_DOC_VALUES_INDEX = -1; + public static final Long STAR_IN_DOC_VALUES_INDEX = null; protected final Set skipStarNodeCreationForDimensions; @@ -65,59 +64,42 @@ public abstract class BaseStarTreeBuilder implements StarTreeBuilder { protected final int maxLeafDocuments; protected final StarTreeBuilderUtils.TreeNode rootNode = getNewNode(); - - protected SequentialDocValuesIterator[] dimensionReaders; - - protected Map fieldProducerMap; - - private final StarTreeDocValuesIteratorAdapter starTreeDocValuesIteratorAdapter; private final StarTreeField starTreeField; + private final MapperService mapperService; + private final SegmentWriteState state; + /** - * Reads all the configuration related to dimensions and metrics, builds a star-tree based on the different construction parameters. + * Builds star tree based on star tree field configuration consisting of dimensions, metrics and star tree index specific configuration. * * @param starTreeField holds the configuration for the star tree - * @param fieldProducerMap helps return the doc values iterator for each type based on field name * @param state stores the segment write state * @param mapperService helps to find the original type of the field */ - protected BaseStarTreeBuilder( - StarTreeField starTreeField, - Map fieldProducerMap, - SegmentWriteState state, - MapperService mapperService - ) throws IOException { + protected BaseStarTreeBuilder(StarTreeField starTreeField, SegmentWriteState state, MapperService mapperService) throws IOException { - logger.debug("Building in base star tree builder"); + logger.debug("Building star tree : {}", starTreeField); this.starTreeField = starTreeField; StarTreeFieldConfiguration starTreeFieldSpec = starTreeField.getStarTreeConfig(); - this.fieldProducerMap = fieldProducerMap; - this.starTreeDocValuesIteratorAdapter = new StarTreeDocValuesIteratorAdapter(); List dimensionsSplitOrder = starTreeField.getDimensionsOrder(); this.numDimensions = dimensionsSplitOrder.size(); this.skipStarNodeCreationForDimensions = new HashSet<>(); this.totalSegmentDocs = state.segmentInfo.maxDoc(); - this.dimensionReaders = new SequentialDocValuesIterator[numDimensions]; + this.mapperService = mapperService; + this.state = state; + Set skipStarNodeCreationForDimensions = starTreeFieldSpec.getSkipStarNodeCreationInDims(); for (int i = 0; i < numDimensions; i++) { - String dimension = dimensionsSplitOrder.get(i).getField(); if (skipStarNodeCreationForDimensions.contains(dimensionsSplitOrder.get(i).getField())) { this.skipStarNodeCreationForDimensions.add(i); } - FieldInfo dimensionFieldInfos = state.fieldInfos.fieldInfo(dimension); - DocValuesType dimensionDocValuesType = dimensionFieldInfos.getDocValuesType(); - dimensionReaders[i] = starTreeDocValuesIteratorAdapter.getDocValuesIterator( - dimensionDocValuesType, - dimensionFieldInfos, - fieldProducerMap.get(dimensionFieldInfos.name) - ); } - this.metricAggregatorInfos = generateMetricAggregatorInfos(mapperService, state); + this.metricAggregatorInfos = generateMetricAggregatorInfos(mapperService); this.numMetrics = metricAggregatorInfos.size(); this.maxLeafDocuments = starTreeFieldSpec.maxLeafDocs(); } @@ -127,13 +109,11 @@ protected BaseStarTreeBuilder( * * @return list of MetricAggregatorInfo */ - public List generateMetricAggregatorInfos(MapperService mapperService, SegmentWriteState state) - throws IOException { + public List generateMetricAggregatorInfos(MapperService mapperService) { List metricAggregatorInfos = new ArrayList<>(); for (Metric metric : this.starTreeField.getMetrics()) { for (MetricStat metricType : metric.getMetrics()) { IndexNumericFieldData.NumericType numericType; - SequentialDocValuesIterator metricStatReader = null; Mapper fieldMapper = mapperService.documentMapper().mappers().getMapper(metric.getField()); if (fieldMapper instanceof NumberFieldMapper) { numericType = ((NumberFieldMapper) fieldMapper).fieldType().numericType(); @@ -142,23 +122,11 @@ public List generateMetricAggregatorInfos(MapperService ma throw new IllegalStateException("unsupported mapper type"); } - FieldInfo metricFieldInfos = state.fieldInfos.fieldInfo(metric.getField()); - DocValuesType metricDocValuesType = metricFieldInfos.getDocValuesType(); - if (metricType != MetricStat.COUNT) { - // Need not initialize the metric reader for COUNT metric type - metricStatReader = starTreeDocValuesIteratorAdapter.getDocValuesIterator( - metricDocValuesType, - metricFieldInfos, - fieldProducerMap.get(metricFieldInfos.name) - ); - } - MetricAggregatorInfo metricAggregatorInfo = new MetricAggregatorInfo( metricType, metric.getField(), starTreeField.getName(), - numericType, - metricStatReader + numericType ); metricAggregatorInfos.add(metricAggregatorInfo); } @@ -166,6 +134,102 @@ public List generateMetricAggregatorInfos(MapperService ma return metricAggregatorInfos; } + /** + * Generates the configuration required to perform aggregation for all the metrics on a field + * + * @return list of MetricAggregatorInfo + */ + public List getMetricReaders(SegmentWriteState state, Map fieldProducerMap) + throws IOException { + List metricReaders = new ArrayList<>(); + for (Metric metric : this.starTreeField.getMetrics()) { + for (MetricStat metricType : metric.getMetrics()) { + SequentialDocValuesIterator metricReader = null; + + FieldInfo metricFieldInfo = state.fieldInfos.fieldInfo(metric.getField()); + if (metricType != MetricStat.COUNT) { + // Need not initialize the metric reader for COUNT metric type + metricReader = new SequentialDocValuesIterator( + fieldProducerMap.get(metricFieldInfo.name).getSortedNumeric(metricFieldInfo) + ); + } + + metricReaders.add(metricReader); + } + } + return metricReaders; + } + + /** + * Builds the star tree from the original segment documents + * + * @param fieldProducerMap contains the docValues producer to get docValues associated with each field + * + * @throws IOException when we are unable to build star-tree + */ + public void build(Map fieldProducerMap) throws IOException { + long startTime = System.currentTimeMillis(); + logger.debug("Star-tree build is a go with star tree field {}", starTreeField.getName()); + + List metricReaders = getMetricReaders(state, fieldProducerMap); + List dimensionsSplitOrder = starTreeField.getDimensionsOrder(); + SequentialDocValuesIterator[] dimensionReaders = new SequentialDocValuesIterator[dimensionsSplitOrder.size()]; + for (int i = 0; i < numDimensions; i++) { + String dimension = dimensionsSplitOrder.get(i).getField(); + FieldInfo dimensionFieldInfo = state.fieldInfos.fieldInfo(dimension); + dimensionReaders[i] = new SequentialDocValuesIterator( + fieldProducerMap.get(dimensionFieldInfo.name).getSortedNumeric(dimensionFieldInfo) + ); + } + Iterator starTreeDocumentIterator = sortAndAggregateSegmentDocuments( + totalSegmentDocs, + dimensionReaders, + metricReaders + ); + logger.debug("Sorting and aggregating star-tree in ms : {}", (System.currentTimeMillis() - startTime)); + build(starTreeDocumentIterator); + logger.debug("Finished Building star-tree in ms : {}", (System.currentTimeMillis() - startTime)); + } + + /** + * Builds the star tree using sorted and aggregated star-tree Documents + * + * @param starTreeDocumentIterator contains the sorted and aggregated documents + * @throws IOException when we are unable to build star-tree + */ + public void build(Iterator starTreeDocumentIterator) throws IOException { + int numSegmentStarTreeDocument = totalSegmentDocs; + + while (starTreeDocumentIterator.hasNext()) { + appendToStarTree(starTreeDocumentIterator.next()); + } + int numStarTreeDocument = numStarTreeDocs; + logger.debug("Generated star tree docs : [{}] from segment docs : [{}]", numStarTreeDocument, numSegmentStarTreeDocument); + + if (numStarTreeDocs == 0) { + // TODO: Uncomment when segment codec is ready + // StarTreeBuilderUtils.serializeTree(indexOutput, rootNode, dimensionsSplitOrder, numNodes); + return; + } + + constructStarTree(rootNode, 0, numStarTreeDocs); + int numStarTreeDocumentUnderStarNode = numStarTreeDocs - numStarTreeDocument; + logger.debug( + "Finished constructing star-tree, got [ {} ] tree nodes and [ {} ] starTreeDocument under star-node", + numStarTreeNodes, + numStarTreeDocumentUnderStarNode + ); + + createAggregatedDocs(rootNode); + int numAggregatedStarTreeDocument = numStarTreeDocs - numStarTreeDocument - numStarTreeDocumentUnderStarNode; + logger.debug("Finished creating aggregated documents : {}", numAggregatedStarTreeDocument); + + // TODO: When StarTree Codec is ready + // Create doc values indices in disk + // Serialize and save in disk + // Write star tree metadata for off heap implementation + } + /** * Adds a document to the star-tree. * @@ -200,16 +264,22 @@ public List generateMetricAggregatorInfos(MapperService ma public abstract long getDimensionValue(int docId, int dimensionId) throws IOException; /** - * Sorts and aggregates the star-tree document in the segment, and returns a star-tree document iterator for all the - * aggregated star-tree document. + * Sorts and aggregates all the documents in the segment as per the configuration, and returns a star-tree document iterator for all the + * aggregated star-tree documents. * * @param numDocs number of documents in the given segment + * @param dimensionReaders List of docValues readers to read dimensions from the segment + * @param metricReaders List of docValues readers to read metrics from the segment * @return Iterator for the aggregated star-tree document */ - public abstract Iterator sortAndAggregateStarTreeDocuments(int numDocs) throws IOException; + public abstract Iterator sortAndAggregateSegmentDocuments( + int numDocs, + SequentialDocValuesIterator[] dimensionReaders, + List metricReaders + ) throws IOException; /** - * Generates aggregated star-tree documents for star-node. + * Generates aggregated star-tree documents for star-node. This inserts null for the star-node dimension and generates aggregated documents based on the remaining dimensions * * @param startDocId start document id (inclusive) in the star-tree * @param endDocId end document id (exclusive) in the star-tree @@ -219,14 +289,13 @@ public List generateMetricAggregatorInfos(MapperService ma public abstract Iterator generateStarTreeDocumentsForStarNode(int startDocId, int endDocId, int dimensionId) throws IOException; - /** - * Returns the star-tree document from the segment - * - * @throws IOException when we are unable to build a star tree document from the segment - */ - protected StarTreeDocument getSegmentStarTreeDocument(int currentDocId) throws IOException { - Long[] dimensions = getStarTreeDimensionsFromSegment(currentDocId); - Object[] metrics = getStarTreeMetricsFromSegment(currentDocId); + protected StarTreeDocument getSegmentStarTreeDocument( + int currentDocId, + SequentialDocValuesIterator[] dimensionReaders, + List metricReaders + ) throws IOException { + Long[] dimensions = getStarTreeDimensionsFromSegment(currentDocId, dimensionReaders); + Object[] metrics = getStarTreeMetricsFromSegment(currentDocId, metricReaders); return new StarTreeDocument(dimensions, metrics); } @@ -236,12 +305,12 @@ protected StarTreeDocument getSegmentStarTreeDocument(int currentDocId) throws I * @return dimension values for each of the star-tree dimension * @throws IOException when we are unable to iterate to the next doc for the given dimension readers */ - Long[] getStarTreeDimensionsFromSegment(int currentDocId) throws IOException { + Long[] getStarTreeDimensionsFromSegment(int currentDocId, SequentialDocValuesIterator[] dimensionReaders) throws IOException { Long[] dimensions = new Long[numDimensions]; for (int i = 0; i < numDimensions; i++) { if (dimensionReaders[i] != null) { try { - starTreeDocValuesIteratorAdapter.nextDoc(dimensionReaders[i], currentDocId); + dimensionReaders[i].nextDoc(currentDocId); } catch (IOException e) { logger.error("unable to iterate to next doc", e); throw new RuntimeException("unable to iterate to next doc", e); @@ -249,8 +318,7 @@ Long[] getStarTreeDimensionsFromSegment(int currentDocId) throws IOException { logger.error("unable to read the dimension values from the segment", e); throw new IllegalStateException("unable to read the dimension values from the segment", e); } - - dimensions[i] = starTreeDocValuesIteratorAdapter.getNextValue(dimensionReaders[i], currentDocId); + dimensions[i] = dimensionReaders[i].value(currentDocId); } else { throw new IllegalStateException("dimension readers are empty"); } @@ -264,13 +332,13 @@ Long[] getStarTreeDimensionsFromSegment(int currentDocId) throws IOException { * @return metric values for each of the star-tree metric * @throws IOException when we are unable to iterate to the next doc for the given metric readers */ - private Object[] getStarTreeMetricsFromSegment(int currentDocId) throws IOException { + private Object[] getStarTreeMetricsFromSegment(int currentDocId, List metricsReaders) throws IOException { Object[] metrics = new Object[numMetrics]; for (int i = 0; i < numMetrics; i++) { - SequentialDocValuesIterator metricStatReader = metricAggregatorInfos.get(i).getMetricStatReader(); + SequentialDocValuesIterator metricStatReader = metricsReaders.get(i); if (metricStatReader != null) { try { - starTreeDocValuesIteratorAdapter.nextDoc(metricStatReader, currentDocId); + metricStatReader.nextDoc(currentDocId); } catch (IOException e) { logger.error("unable to iterate to next doc", e); throw new RuntimeException("unable to iterate to next doc", e); @@ -278,7 +346,7 @@ private Object[] getStarTreeMetricsFromSegment(int currentDocId) throws IOExcept logger.error("unable to read the metric values from the segment", e); throw new IllegalStateException("unable to read the metric values from the segment", e); } - metrics[i] = starTreeDocValuesIteratorAdapter.getNextValue(metricStatReader, currentDocId); + metrics[i] = metricStatReader.value(currentDocId); } else { throw new IllegalStateException("metric readers are empty"); } @@ -398,61 +466,6 @@ public StarTreeDocument reduceStarTreeDocuments(StarTreeDocument aggregatedDocum } } - /** - * Builds the star tree using total segment documents - * - * @throws IOException when we are unable to build star-tree - */ - public void build() throws IOException { - long startTime = System.currentTimeMillis(); - logger.debug("Star-tree build is a go with star tree field {}", starTreeField.getName()); - - Iterator starTreeDocumentIterator = sortAndAggregateStarTreeDocuments(totalSegmentDocs); - logger.debug("Sorting and aggregating star-tree in ms : {}", (System.currentTimeMillis() - startTime)); - build(starTreeDocumentIterator); - logger.debug("Finished Building star-tree in ms : {}", (System.currentTimeMillis() - startTime)); - } - - /** - * Builds the star tree using Star-Tree Document - * - * @param starTreeDocumentIterator contains the sorted and aggregated documents - * @throws IOException when we are unable to build star-tree - */ - public void build(Iterator starTreeDocumentIterator) throws IOException { - int numSegmentStarTreeDocument = totalSegmentDocs; - - while (starTreeDocumentIterator.hasNext()) { - appendToStarTree(starTreeDocumentIterator.next()); - } - int numStarTreeDocument = numStarTreeDocs; - logger.debug("Generated star tree docs : [{}] from segment docs : [{}]", numStarTreeDocument, numSegmentStarTreeDocument); - - if (numStarTreeDocs == 0) { - // TODO: Uncomment when segment codec is ready - // StarTreeBuilderUtils.serializeTree(indexOutput, rootNode, dimensionsSplitOrder, numNodes); - return; - } - - constructStarTree(rootNode, 0, numStarTreeDocs); - int numStarTreeDocumentUnderStarNode = numStarTreeDocs - numStarTreeDocument; - logger.debug( - "Finished constructing star-tree, got [ {} ] tree nodes and [ {} ] starTreeDocument under star-node", - numStarTreeNodes, - numStarTreeDocumentUnderStarNode - ); - - createAggregatedDocs(rootNode); - int numAggregatedStarTreeDocument = numStarTreeDocs - numStarTreeDocument - numStarTreeDocumentUnderStarNode; - logger.debug("Finished creating aggregated documents : {}", numAggregatedStarTreeDocument); - - // TODO: When StarTree Codec is ready - // Create doc values indices in disk - // Serialize and save in disk - // Write star tree metadata for off heap implementation - - } - /** * Adds a document to star-tree * @@ -592,7 +605,7 @@ private StarTreeDocument createAggregatedDocs(StarTreeBuilderUtils.TreeNode node throw new IllegalStateException("aggregated star-tree document is null after reducing the documents"); } for (int i = node.dimensionId + 1; i < numDimensions; i++) { - aggregatedStarTreeDocument.dimensions[i] = Long.valueOf(STAR_IN_DOC_VALUES_INDEX); + aggregatedStarTreeDocument.dimensions[i] = STAR_IN_DOC_VALUES_INDEX; } node.aggregatedDocId = numStarTreeDocs; appendToStarTree(aggregatedStarTreeDocument); @@ -618,7 +631,7 @@ private StarTreeDocument createAggregatedDocs(StarTreeBuilderUtils.TreeNode node throw new IllegalStateException("aggregated star-tree document is null after reducing the documents"); } for (int i = node.dimensionId + 1; i < numDimensions; i++) { - aggregatedStarTreeDocument.dimensions[i] = Long.valueOf(STAR_IN_DOC_VALUES_INDEX); + aggregatedStarTreeDocument.dimensions[i] = STAR_IN_DOC_VALUES_INDEX; } node.aggregatedDocId = numStarTreeDocs; appendToStarTree(aggregatedStarTreeDocument); diff --git a/server/src/main/java/org/opensearch/index/codec/composite/Composite90DocValuesReader.java b/server/src/main/java/org/opensearch/index/codec/composite/Composite90DocValuesReader.java index 0c5c5dbde868f..61491cd6850bd 100644 --- a/server/src/main/java/org/opensearch/index/codec/composite/Composite90DocValuesReader.java +++ b/server/src/main/java/org/opensearch/index/codec/composite/Composite90DocValuesReader.java @@ -17,7 +17,6 @@ import org.apache.lucene.index.SortedNumericDocValues; import org.apache.lucene.index.SortedSetDocValues; import org.opensearch.common.annotation.ExperimentalApi; -import org.opensearch.index.mapper.CompositeMappedFieldType; import java.io.IOException; import java.util.List; @@ -74,15 +73,14 @@ public void close() throws IOException { } @Override - public List getCompositeIndexFields() { + public List getCompositeIndexFields() { // todo : read from file formats and get the field names. throw new UnsupportedOperationException(); } @Override - public CompositeIndexValues getCompositeIndexValues(String field, CompositeMappedFieldType.CompositeFieldType fieldType) - throws IOException { + public CompositeIndexValues getCompositeIndexValues(CompositeIndexFieldInfo compositeIndexFieldInfo) throws IOException { // TODO : read compositeIndexValues [starTreeValues] from star tree files throw new UnsupportedOperationException(); } diff --git a/server/src/main/java/org/opensearch/index/codec/composite/Composite90DocValuesWriter.java b/server/src/main/java/org/opensearch/index/codec/composite/Composite90DocValuesWriter.java index 57fce7d0640a0..4e3da8043e795 100644 --- a/server/src/main/java/org/opensearch/index/codec/composite/Composite90DocValuesWriter.java +++ b/server/src/main/java/org/opensearch/index/codec/composite/Composite90DocValuesWriter.java @@ -8,19 +8,26 @@ package org.opensearch.index.codec.composite; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.codecs.DocValuesConsumer; import org.apache.lucene.codecs.DocValuesProducer; import org.apache.lucene.index.FieldInfo; import org.apache.lucene.index.MergeState; import org.apache.lucene.index.SegmentWriteState; import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.index.codec.composite.datacube.startree.StarTreeValues; +import org.opensearch.index.compositeindex.datacube.startree.StarTreeField; +import org.opensearch.index.compositeindex.datacube.startree.builder.StarTreesBuilder; import org.opensearch.index.mapper.CompositeMappedFieldType; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.mapper.StarTreeMapper; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; @@ -40,6 +47,7 @@ public class Composite90DocValuesWriter extends DocValuesConsumer { private final Set compositeFieldSet; private final Map fieldProducerMap = new HashMap<>(); + private static final Logger logger = LogManager.getLogger(Composite90DocValuesWriter.class); public Composite90DocValuesWriter(DocValuesConsumer delegate, SegmentWriteState segmentWriteState, MapperService mapperService) throws IOException { @@ -101,6 +109,8 @@ private void createCompositeIndicesIfPossible(DocValuesProducer valuesProducer, // TODO : Call StarTree builder } } + StarTreesBuilder starTreesBuilder = new StarTreesBuilder(state, mapperService); + starTreesBuilder.build(fieldProducerMap); } } @@ -109,7 +119,48 @@ public void merge(MergeState mergeState) throws IOException { // TODO : check if class variable will cause concurrency issues this.mergeState = mergeState; super.merge(mergeState); - // TODO : handle merge star tree - // mergeStarTreeFields(mergeState); + mergeCompositeFields(mergeState); + } + + /** + * Merges composite fields from multiple segments + * @param mergeState merge state + */ + private void mergeCompositeFields(MergeState mergeState) throws IOException { + mergeStarTreeFields(mergeState); + } + + /** + * Merges star tree data fields from multiple segments + * @param mergeState merge state + */ + private void mergeStarTreeFields(MergeState mergeState) throws IOException { + Map> starTreeSubsPerField = new HashMap<>(); + Map starTreeFieldMap = new HashMap<>(); + for (int i = 0; i < mergeState.docValuesProducers.length; i++) { + CompositeIndexReader reader = (CompositeIndexReader) mergeState.docValuesProducers[i]; + List compositeFieldInfo = reader.getCompositeIndexFields(); + for (CompositeIndexFieldInfo fieldInfo : compositeFieldInfo) { + if (fieldInfo.getType().equals(CompositeMappedFieldType.CompositeFieldType.STAR_TREE)) { + CompositeIndexValues compositeIndexValues = reader.getCompositeIndexValues(fieldInfo); + if (compositeIndexValues instanceof StarTreeValues) { + List fieldsList = starTreeSubsPerField.getOrDefault(fieldInfo.getField(), Collections.emptyList()); + if (!starTreeFieldMap.containsKey(fieldInfo.getField())) { + starTreeFieldMap.put(fieldInfo.getField(), ((StarTreeValues) compositeIndexValues).getStarTreeField()); + } + // assert star tree configuration is same across segments + else { + assert starTreeFieldMap.get(fieldInfo.getField()) + .equals(((StarTreeValues) compositeIndexValues).getStarTreeField()); + logger.error("Star tree configuration is not same for segments during merge"); + } + fieldsList.add((StarTreeValues) compositeIndexValues); + starTreeSubsPerField.put(fieldInfo.getField(), fieldsList); + } + } + } + } + final StarTreesBuilder starTreesBuilder = new StarTreesBuilder(state, mapperService); + starTreesBuilder.buildDuringMerge(starTreeFieldMap, starTreeSubsPerField); } } diff --git a/server/src/main/java/org/opensearch/index/codec/composite/CompositeIndexFieldInfo.java b/server/src/main/java/org/opensearch/index/codec/composite/CompositeIndexFieldInfo.java new file mode 100644 index 0000000000000..4ac5bcf1b605f --- /dev/null +++ b/server/src/main/java/org/opensearch/index/codec/composite/CompositeIndexFieldInfo.java @@ -0,0 +1,36 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.composite; + +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.index.mapper.CompositeMappedFieldType; + +/** + * Field info details of composite index fields + * + * @opensearch.experimental + */ +@ExperimentalApi +public class CompositeIndexFieldInfo { + private final String field; + private final CompositeMappedFieldType.CompositeFieldType type; + + public CompositeIndexFieldInfo(String field, CompositeMappedFieldType.CompositeFieldType type) { + this.field = field; + this.type = type; + } + + public String getField() { + return field; + } + + public CompositeMappedFieldType.CompositeFieldType getType() { + return type; + } +} diff --git a/server/src/main/java/org/opensearch/index/codec/composite/CompositeIndexReader.java b/server/src/main/java/org/opensearch/index/codec/composite/CompositeIndexReader.java index d02438b75377d..a159b0619bcbb 100644 --- a/server/src/main/java/org/opensearch/index/codec/composite/CompositeIndexReader.java +++ b/server/src/main/java/org/opensearch/index/codec/composite/CompositeIndexReader.java @@ -9,7 +9,6 @@ package org.opensearch.index.codec.composite; import org.opensearch.common.annotation.ExperimentalApi; -import org.opensearch.index.mapper.CompositeMappedFieldType; import java.io.IOException; import java.util.List; @@ -25,10 +24,10 @@ public interface CompositeIndexReader { * Get list of composite index fields from the segment * */ - List getCompositeIndexFields(); + List getCompositeIndexFields(); /** * Get composite index values based on the field name and the field type */ - CompositeIndexValues getCompositeIndexValues(String field, CompositeMappedFieldType.CompositeFieldType fieldType) throws IOException; + CompositeIndexValues getCompositeIndexValues(CompositeIndexFieldInfo fieldInfo) throws IOException; } diff --git a/server/src/main/java/org/opensearch/index/codec/composite/datacube/startree/StarTreeValues.java b/server/src/main/java/org/opensearch/index/codec/composite/datacube/startree/StarTreeValues.java index 8b93222f97f4c..baed90273d311 100644 --- a/server/src/main/java/org/opensearch/index/codec/composite/datacube/startree/StarTreeValues.java +++ b/server/src/main/java/org/opensearch/index/codec/composite/datacube/startree/StarTreeValues.java @@ -8,10 +8,13 @@ package org.opensearch.index.codec.composite.datacube.startree; +import org.apache.lucene.search.DocIdSetIterator; import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.index.codec.composite.CompositeIndexValues; +import org.opensearch.index.compositeindex.datacube.startree.StarTreeField; +import org.opensearch.index.compositeindex.datacube.startree.node.StarTreeNode; -import java.util.List; +import java.util.Map; /** * Concrete class that holds the star tree associated values from the segment @@ -20,16 +23,41 @@ */ @ExperimentalApi public class StarTreeValues implements CompositeIndexValues { - private final List dimensionsOrder; + private final StarTreeField starTreeField; + private final StarTreeNode root; + private final Map dimensionDocValuesIteratorMap; + private final Map metricDocValuesIteratorMap; - // TODO : come up with full set of vales such as dimensions and metrics doc values + star tree - public StarTreeValues(List dimensionsOrder) { - super(); - this.dimensionsOrder = dimensionsOrder; + public StarTreeValues( + StarTreeField starTreeField, + StarTreeNode root, + Map dimensionDocValuesIteratorMap, + Map metricDocValuesIteratorMap + ) { + this.starTreeField = starTreeField; + this.root = root; + this.dimensionDocValuesIteratorMap = dimensionDocValuesIteratorMap; + this.metricDocValuesIteratorMap = metricDocValuesIteratorMap; } @Override public CompositeIndexValues getValues() { return this; } + + public StarTreeField getStarTreeField() { + return starTreeField; + } + + public StarTreeNode getRoot() { + return root; + } + + public Map getDimensionDocValuesIteratorMap() { + return dimensionDocValuesIteratorMap; + } + + public Map getMetricDocValuesIteratorMap() { + return metricDocValuesIteratorMap; + } } diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MetricAggregatorInfo.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MetricAggregatorInfo.java index 279e21b75ee18..3895b53fe7466 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MetricAggregatorInfo.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MetricAggregatorInfo.java @@ -9,7 +9,6 @@ import org.opensearch.index.compositeindex.datacube.MetricStat; import org.opensearch.index.compositeindex.datacube.startree.aggregators.numerictype.StarTreeNumericType; -import org.opensearch.index.compositeindex.datacube.startree.utils.SequentialDocValuesIterator; import org.opensearch.index.fielddata.IndexNumericFieldData; import java.util.Comparator; @@ -28,22 +27,14 @@ public class MetricAggregatorInfo implements Comparable { private final String field; private final ValueAggregator valueAggregators; private final StarTreeNumericType starTreeNumericType; - private final SequentialDocValuesIterator metricStatReader; /** * Constructor for MetricAggregatorInfo */ - public MetricAggregatorInfo( - MetricStat metricStat, - String field, - String starFieldName, - IndexNumericFieldData.NumericType numericType, - SequentialDocValuesIterator metricStatReader - ) { + public MetricAggregatorInfo(MetricStat metricStat, String field, String starFieldName, IndexNumericFieldData.NumericType numericType) { this.metricStat = metricStat; this.valueAggregators = ValueAggregatorFactory.getValueAggregator(metricStat); this.starTreeNumericType = StarTreeNumericType.fromNumericType(numericType); - this.metricStatReader = metricStatReader; this.field = field; this.starFieldName = starFieldName; this.metric = toFieldName(); @@ -84,13 +75,6 @@ public StarTreeNumericType getAggregatedValueType() { return starTreeNumericType; } - /** - * @return metric value reader iterator - */ - public SequentialDocValuesIterator getMetricStatReader() { - return metricStatReader; - } - /** * @return field name with metric type and field */ diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilder.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilder.java index 8eda381d03082..fb23065580b87 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilder.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilder.java @@ -7,12 +7,15 @@ */ package org.opensearch.index.compositeindex.datacube.startree.builder; -import org.apache.lucene.codecs.DocValuesProducer; import org.apache.lucene.index.BaseStarTreeBuilder; import org.apache.lucene.index.SegmentWriteState; +import org.apache.lucene.search.DocIdSetIterator; import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.index.codec.composite.datacube.startree.StarTreeValues; +import org.opensearch.index.compositeindex.datacube.Dimension; import org.opensearch.index.compositeindex.datacube.startree.StarTreeDocument; import org.opensearch.index.compositeindex.datacube.startree.StarTreeField; +import org.opensearch.index.compositeindex.datacube.startree.utils.SequentialDocValuesIterator; import org.opensearch.index.mapper.MapperService; import java.io.IOException; @@ -23,7 +26,7 @@ import java.util.Map; /** - * On heap single tree builder + * On heap based single tree builder * @opensearch.experimental */ @ExperimentalApi @@ -35,18 +38,13 @@ public class OnHeapStarTreeBuilder extends BaseStarTreeBuilder { * Constructor for OnHeapStarTreeBuilder * * @param starTreeField star-tree field - * @param fieldProducerMap helps with document values producer for a particular field * @param segmentWriteState segment write state * @param mapperService helps with the numeric type of field - * @throws IOException throws an exception we are unable to construct an onheap star-tree + * @throws IOException throws an exception when we are unable to construct a star-tree using on-heap approach */ - public OnHeapStarTreeBuilder( - StarTreeField starTreeField, - Map fieldProducerMap, - SegmentWriteState segmentWriteState, - MapperService mapperService - ) throws IOException { - super(starTreeField, fieldProducerMap, segmentWriteState, mapperService); + public OnHeapStarTreeBuilder(StarTreeField starTreeField, SegmentWriteState segmentWriteState, MapperService mapperService) + throws IOException { + super(starTreeField, segmentWriteState, mapperService); } @Override @@ -54,6 +52,81 @@ public void appendStarTreeDocument(StarTreeDocument starTreeDocument) throws IOE starTreeDocuments.add(starTreeDocument); } + @Override + public void build(List starTreeValuesSubs) throws IOException { + build(mergeStarTrees(starTreeValuesSubs)); + } + + /** + * Sorts and aggregates the star-tree documents from multiple segments and builds star tree based on the newly + * aggregated star-tree documents + * + * @param starTreeValuesSubs StarTreeValues from multiple segments + * @return iterator of star tree documents + */ + Iterator mergeStarTrees(List starTreeValuesSubs) throws IOException { + return sortAndAggregateStarTreeDocuments(mergeStarTreeValues(starTreeValuesSubs)); + } + + /** + * Returns an array of all the starTreeDocuments from all the segments + * + * @param starTreeValuesSubs StarTreeValues from multiple segments + * @return array of star tree documents + */ + StarTreeDocument[] mergeStarTreeValues(List starTreeValuesSubs) throws IOException { + List starTreeDocuments = new ArrayList<>(); + for (StarTreeValues starTreeValues : starTreeValuesSubs) { + List dimensionsSplitOrder = starTreeValues.getStarTreeField().getDimensionsOrder(); + SequentialDocValuesIterator[] dimensionReaders = new SequentialDocValuesIterator[starTreeValues.getStarTreeField() + .getDimensionsOrder() + .size()]; + + for (int i = 0; i < dimensionsSplitOrder.size(); i++) { + String dimension = dimensionsSplitOrder.get(i).getField(); + dimensionReaders[i] = new SequentialDocValuesIterator(starTreeValues.getDimensionDocValuesIteratorMap().get(dimension)); + } + + List metricReaders = new ArrayList<>(); + for (Map.Entry metricDocValuesEntry : starTreeValues.getMetricDocValuesIteratorMap().entrySet()) { + metricReaders.add(new SequentialDocValuesIterator(metricDocValuesEntry.getValue())); + } + + boolean endOfDoc = false; + int currentDocId = 0; + while (!endOfDoc) { + Long[] dims = new Long[starTreeValues.getStarTreeField().getDimensionsOrder().size()]; + int i = 0; + for (SequentialDocValuesIterator dimensionDocValueIterator : dimensionReaders) { + int doc = dimensionDocValueIterator.nextDoc(currentDocId); + Long val = dimensionDocValueIterator.value(currentDocId); + // TODO : figure out how to identify a row with star tree docs here + endOfDoc = (doc == DocIdSetIterator.NO_MORE_DOCS); + if (endOfDoc) { + break; + } + dims[i] = val; + i++; + } + if (endOfDoc) { + break; + } + i = 0; + Object[] metrics = new Object[metricReaders.size()]; + for (SequentialDocValuesIterator metricDocValuesIterator : metricReaders) { + metricDocValuesIterator.nextDoc(currentDocId); + metrics[i] = metricDocValuesIterator.value(currentDocId); + i++; + } + StarTreeDocument starTreeDocument = new StarTreeDocument(dims, metrics); + starTreeDocuments.add(starTreeDocument); + currentDocId++; + } + } + StarTreeDocument[] recordsArr = new StarTreeDocument[starTreeDocuments.size()]; + return starTreeDocuments.toArray(recordsArr); + } + @Override public StarTreeDocument getStarTreeDocument(int docId) throws IOException { return starTreeDocuments.get(docId); @@ -70,26 +143,48 @@ public long getDimensionValue(int docId, int dimensionId) throws IOException { return starTreeDocuments.get(docId).dimensions[dimensionId]; } + /** + * Sorts and aggregates all the documents of the segment based on dimension and metrics configuration + * + * @param numDocs number of documents in the given segment + * @param dimensionReaders List of docValues readers to read dimensions from the segment + * @param metricReaders List of docValues readers to read metrics from the segment + * @return Iterator of star-tree documents + * @throws IOException + */ @Override - public Iterator sortAndAggregateStarTreeDocuments(int numDocs) throws IOException { + public Iterator sortAndAggregateSegmentDocuments( + int numDocs, + SequentialDocValuesIterator[] dimensionReaders, + List metricReaders + ) throws IOException { StarTreeDocument[] starTreeDocuments = new StarTreeDocument[numDocs]; for (int currentDocId = 0; currentDocId < numDocs; currentDocId++) { - starTreeDocuments[currentDocId] = getSegmentStarTreeDocument(currentDocId); + starTreeDocuments[currentDocId] = getSegmentStarTreeDocument(currentDocId, dimensionReaders, metricReaders); } return sortAndAggregateStarTreeDocuments(starTreeDocuments); } /** - * Sort, aggregates and merges the star-tree documents + * Sorts and aggregates the star-tree documents + * * @param starTreeDocuments star-tree documents * @return iterator for star-tree documents * @throws IOException throws when unable to sort, merge and aggregate star-tree documents */ public Iterator sortAndAggregateStarTreeDocuments(StarTreeDocument[] starTreeDocuments) throws IOException { - // sort the documents Arrays.sort(starTreeDocuments, (o1, o2) -> { for (int i = 0; i < numDimensions; i++) { + if (o1.dimensions[i] == null && o2.dimensions[i] == null) { + return 0; + } + if (o1.dimensions[i] == null) { + return 1; + } + if (o2.dimensions[i] == null) { + return -1; + } if (o1.dimensions[i] != o2.dimensions[i]) { return Long.compare(o1.dimensions[i], o2.dimensions[i]); } @@ -102,7 +197,8 @@ public Iterator sortAndAggregateStarTreeDocuments(StarTreeDocu } /** - * Merges the star-tree documents + * Merges the star-tree documents based on dimensions + * * @param starTreeDocuments star-tree documents * @return iterator to aggregate star-tree documents */ @@ -138,6 +234,7 @@ public StarTreeDocument next() { /** * Generates a star-tree for a given star-node + * * @param startDocId Start document id in the star-tree * @param endDocId End document id (exclusive) in the star-tree * @param dimensionId Dimension id of the star-node @@ -182,7 +279,7 @@ public boolean hasNext() { @Override public StarTreeDocument next() { StarTreeDocument next = reduceStarTreeDocuments(null, currentStarTreeDocument); - next.dimensions[dimensionId] = Long.valueOf(STAR_IN_DOC_VALUES_INDEX); + next.dimensions[dimensionId] = STAR_IN_DOC_VALUES_INDEX; while (docId < numDocs) { StarTreeDocument starTreeDocument = starTreeDocuments[docId++]; if (!hasSameDimensions(starTreeDocument, currentStarTreeDocument)) { diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreeBuilder.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreeBuilder.java index ef542a1c848f2..988379d7400fe 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreeBuilder.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreeBuilder.java @@ -8,21 +8,38 @@ package org.opensearch.index.compositeindex.datacube.startree.builder; +import org.apache.lucene.codecs.DocValuesProducer; import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.index.codec.composite.datacube.startree.StarTreeValues; import java.io.Closeable; import java.io.IOException; +import java.util.List; +import java.util.Map; /** * A star-tree builder that builds a single star-tree. + * * @opensearch.experimental */ @ExperimentalApi public interface StarTreeBuilder extends Closeable { /** - * Builds the star tree based on star-tree field + * Builds the star tree from the original segment documents + * + * @param fieldProducerMap contains the docValues producer to get docValues associated with each field + * + * @throws IOException when we are unable to build star-tree + */ + + void build(Map fieldProducerMap) throws IOException; + + /** + * Builds the star tree using StarTree values from multiple segments + * + * @param starTreeValuesSubs contains the star tree values from multiple segments * @throws IOException when we are unable to build star-tree */ - void build() throws IOException; + void build(List starTreeValuesSubs) throws IOException; } diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreeDocValuesIteratorAdapter.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreeDocValuesIteratorAdapter.java deleted file mode 100644 index cb0350bb110b0..0000000000000 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreeDocValuesIteratorAdapter.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.index.compositeindex.datacube.startree.builder; - -import org.apache.lucene.codecs.DocValuesProducer; -import org.apache.lucene.index.DocValuesType; -import org.apache.lucene.index.FieldInfo; -import org.apache.lucene.index.SortedNumericDocValues; -import org.apache.lucene.search.DocIdSetIterator; -import org.opensearch.common.annotation.ExperimentalApi; -import org.opensearch.index.compositeindex.datacube.startree.utils.SequentialDocValuesIterator; - -import java.io.IOException; - -/** - * A factory class to return respective doc values iterator based on the doc volues type. - * - * @opensearch.experimental - */ -@ExperimentalApi -public class StarTreeDocValuesIteratorAdapter { - - /** - * Creates an iterator for the given doc values type and field using the doc values producer - */ - public SequentialDocValuesIterator getDocValuesIterator(DocValuesType type, FieldInfo field, DocValuesProducer producer) - throws IOException { - switch (type) { - case SORTED_NUMERIC: - return new SequentialDocValuesIterator(producer.getSortedNumeric(field)); - default: - throw new IllegalArgumentException("Unsupported DocValuesType: " + type); - } - } - - /** - * Returns the next value for the given iterator - */ - public Long getNextValue(SequentialDocValuesIterator sequentialDocValuesIterator, int currentDocId) throws IOException { - if (sequentialDocValuesIterator.getDocIdSetIterator() instanceof SortedNumericDocValues) { - SortedNumericDocValues sortedNumericDocValues = (SortedNumericDocValues) sequentialDocValuesIterator.getDocIdSetIterator(); - if (sequentialDocValuesIterator.getDocId() < 0 || sequentialDocValuesIterator.getDocId() == DocIdSetIterator.NO_MORE_DOCS) { - throw new IllegalStateException("invalid doc id to fetch the next value"); - } - - if (sequentialDocValuesIterator.getDocValue() == null) { - sequentialDocValuesIterator.setDocValue(sortedNumericDocValues.nextValue()); - return sequentialDocValuesIterator.getDocValue(); - } - - if (sequentialDocValuesIterator.getDocId() == currentDocId) { - Long nextValue = sequentialDocValuesIterator.getDocValue(); - sequentialDocValuesIterator.setDocValue(null); - return nextValue; - } else { - return null; - } - } else { - throw new IllegalStateException("Unsupported Iterator: " + sequentialDocValuesIterator.getDocIdSetIterator().toString()); - } - } - - /** - * Moves to the next doc in the iterator - * Returns the doc id for the next document from the given iterator - */ - public int nextDoc(SequentialDocValuesIterator iterator, int currentDocId) throws IOException { - if (iterator.getDocValue() != null) { - return iterator.getDocId(); - } - iterator.setDocId(iterator.getDocIdSetIterator().nextDoc()); - iterator.setDocValue(this.getNextValue(iterator, currentDocId)); - return iterator.getDocId(); - } - -} diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreesBuilder.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreesBuilder.java index 282b5a07972d5..dc24fb8d8f745 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreesBuilder.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreesBuilder.java @@ -13,6 +13,7 @@ import org.apache.lucene.codecs.DocValuesProducer; import org.apache.lucene.index.SegmentWriteState; import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.index.codec.composite.datacube.startree.StarTreeValues; import org.opensearch.index.compositeindex.datacube.startree.StarTreeField; import org.opensearch.index.mapper.CompositeMappedFieldType; import org.opensearch.index.mapper.MapperService; @@ -37,14 +38,9 @@ public class StarTreesBuilder implements Closeable { private final List starTreeFields; private final SegmentWriteState state; - private final Map fieldProducerMap; private final MapperService mapperService; - public StarTreesBuilder( - Map fieldProducerMap, - SegmentWriteState segmentWriteState, - MapperService mapperService - ) { + public StarTreesBuilder(SegmentWriteState segmentWriteState, MapperService mapperService) { List starTreeFields = new ArrayList<>(); for (CompositeMappedFieldType compositeMappedFieldType : mapperService.getCompositeFieldTypes()) { if (compositeMappedFieldType instanceof StarTreeMapper.StarTreeFieldType) { @@ -59,9 +55,7 @@ public StarTreesBuilder( ); } } - this.starTreeFields = starTreeFields; - this.fieldProducerMap = fieldProducerMap; this.state = segmentWriteState; this.mapperService = mapperService; } @@ -69,20 +63,21 @@ public StarTreesBuilder( /** * Builds the star-trees. */ - public void build() throws IOException { + public void build(Map fieldProducerMap) throws IOException { if (starTreeFields.isEmpty()) { logger.debug("no star-tree fields found, returning from star-tree builder"); return; } long startTime = System.currentTimeMillis(); + int numStarTrees = starTreeFields.size(); logger.debug("Starting building {} star-trees with star-tree fields", numStarTrees); // Build all star-trees for (int i = 0; i < numStarTrees; i++) { StarTreeField starTreeField = starTreeFields.get(i); - try (StarTreeBuilder starTreeBuilder = getSingleTreeBuilder(starTreeField, fieldProducerMap, state, mapperService)) { - starTreeBuilder.build(); + try (StarTreeBuilder starTreeBuilder = getSingleTreeBuilder(starTreeField, state, mapperService)) { + starTreeBuilder.build(fieldProducerMap); } } logger.debug("Took {} ms to building {} star-trees with star-tree fields", System.currentTimeMillis() - startTime, numStarTrees); @@ -90,18 +85,35 @@ public void build() throws IOException { @Override public void close() throws IOException { - + // TODO : close files } - private static StarTreeBuilder getSingleTreeBuilder( - StarTreeField starTreeField, - Map fieldProducerMap, - SegmentWriteState state, - MapperService mapperService + /** + * Merges star tree fields from multiple segments + * @param starTreeFieldMap StarTreeField configuration per field + * @param starTreeValuesSubsPerField starTreeValuesSubs per field + * + */ + public void buildDuringMerge( + final Map starTreeFieldMap, + final Map> starTreeValuesSubsPerField ) throws IOException { + for (Map.Entry> entry : starTreeValuesSubsPerField.entrySet()) { + List starTreeValuesList = entry.getValue(); + StarTreeField starTreeField = starTreeFieldMap.get(entry.getKey()); + StarTreeBuilder builder = getSingleTreeBuilder(starTreeField, state, mapperService); + builder.build(starTreeValuesList); + } + } + + /** + * Get star-tree builder based on build mode. + */ + private static StarTreeBuilder getSingleTreeBuilder(StarTreeField starTreeField, SegmentWriteState state, MapperService mapperService) + throws IOException { switch (starTreeField.getStarTreeConfig().getBuildMode()) { case ON_HEAP: - return new OnHeapStarTreeBuilder(starTreeField, fieldProducerMap, state, mapperService); + return new OnHeapStarTreeBuilder(starTreeField, state, mapperService); default: throw new IllegalArgumentException( String.format( diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/StarTreeNode.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/StarTreeNode.java new file mode 100644 index 0000000000000..dfc19d08e3534 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/StarTreeNode.java @@ -0,0 +1,57 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.compositeindex.datacube.startree.node; + +import org.opensearch.common.annotation.ExperimentalApi; + +import java.io.IOException; +import java.util.Iterator; + +/** + * Interface that represents star tree node + * + * @opensearch.experimental + */ +@ExperimentalApi +public interface StarTreeNode { + long ALL = -1l; + + /** Get the index of the dimension. */ + int getDimensionId() throws IOException; + + /** Get the value (dictionary id) of the dimension. */ + long getDimensionValue() throws IOException; + + /** Get the child dimension id. */ + int getChildDimensionId() throws IOException; + + /** Get the index of the start document. */ + int getStartDocId() throws IOException; + + /** Get the index of the end document (exclusive). */ + int getEndDocId() throws IOException; + + /** Get the index of the aggregated document. */ + int getAggregatedDocId() throws IOException; + + /** Get the number of children nodes. */ + int getNumChildren() throws IOException; + + /** Return true if the node is a leaf node, false otherwise. */ + boolean isLeaf(); + + /** + * Get the child node corresponding to the given dimension value (dictionary id), or null if such + * child does not exist. + */ + StarTreeNode getChildForDimensionValue(long dimensionValue) throws IOException; + + /** Get the iterator over all children nodes. */ + Iterator getChildrenIterator() throws IOException; +} diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/package-info.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/package-info.java new file mode 100644 index 0000000000000..516d5b5a012ab --- /dev/null +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Holds classes associated with star tree node + */ +package org.opensearch.index.compositeindex.datacube.startree.node; diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/SequentialDocValuesIterator.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/SequentialDocValuesIterator.java index 1fdbddcc56fa0..387438d5b83f7 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/SequentialDocValuesIterator.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/SequentialDocValuesIterator.java @@ -8,9 +8,12 @@ package org.opensearch.index.compositeindex.datacube.startree.utils; +import org.apache.lucene.index.SortedNumericDocValues; import org.apache.lucene.search.DocIdSetIterator; import org.opensearch.common.annotation.ExperimentalApi; +import java.io.IOException; + /** * Coordinates the reading of documents across multiple DocIdSetIterators. * It encapsulates a single DocIdSetIterator and maintains the latest document ID and its associated value. @@ -32,7 +35,7 @@ public class SequentialDocValuesIterator { /** * The id of the latest document. */ - private int docId; + private int docId = -1; /** * Constructs a new SequentialDocValuesIterator instance with the given DocIdSetIterator. @@ -87,4 +90,42 @@ public void setDocId(int docId) { public DocIdSetIterator getDocIdSetIterator() { return docIdSetIterator; } + + public int nextDoc(int currentDocId) throws IOException { + // if doc id stored is less than or equal to the requested doc id , return the stored doc id + if (docId >= currentDocId) { + return docId; + } + setDocId(this.docIdSetIterator.nextDoc()); + return docId; + } + + public Long value(int currentDocId) throws IOException { + if (this.getDocIdSetIterator() instanceof SortedNumericDocValues) { + SortedNumericDocValues sortedNumericDocValues = (SortedNumericDocValues) this.getDocIdSetIterator(); + if (currentDocId < 0) { + throw new IllegalStateException("invalid doc id to fetch the next value"); + } + if (currentDocId == DocIdSetIterator.NO_MORE_DOCS) { + throw new IllegalStateException("DocValuesIterator is already exhausted"); + } + + if (docId == DocIdSetIterator.NO_MORE_DOCS) { + return null; + } + + if (docValue == null) { + setDocValue(sortedNumericDocValues.nextValue()); + } + if (docId == currentDocId) { + Long nextValue = docValue; + docValue = null; + return nextValue; + } else { + return null; + } + } else { + throw new IllegalStateException("Unsupported Iterator requested for SequentialDocValuesIterator"); + } + } } diff --git a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MetricAggregatorInfoTests.java b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MetricAggregatorInfoTests.java index d08f637a3f0a9..73e6aeb44cfd7 100644 --- a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MetricAggregatorInfoTests.java +++ b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MetricAggregatorInfoTests.java @@ -19,8 +19,7 @@ public void testConstructor() { MetricStat.SUM, "column1", "star_tree_field", - IndexNumericFieldData.NumericType.DOUBLE, - null + IndexNumericFieldData.NumericType.DOUBLE ); assertEquals(MetricStat.SUM, pair.getMetricStat()); assertEquals("column1", pair.getField()); @@ -31,8 +30,7 @@ public void testCountStarConstructor() { MetricStat.COUNT, "anything", "star_tree_field", - IndexNumericFieldData.NumericType.DOUBLE, - null + IndexNumericFieldData.NumericType.DOUBLE ); assertEquals(MetricStat.COUNT, pair.getMetricStat()); assertEquals("anything", pair.getField()); @@ -43,8 +41,7 @@ public void testToFieldName() { MetricStat.SUM, "column2", "star_tree_field", - IndexNumericFieldData.NumericType.DOUBLE, - null + IndexNumericFieldData.NumericType.DOUBLE ); assertEquals("star_tree_field_column2_sum", pair.toFieldName()); } @@ -54,24 +51,22 @@ public void testEquals() { MetricStat.SUM, "column1", "star_tree_field", - IndexNumericFieldData.NumericType.DOUBLE, - null + IndexNumericFieldData.NumericType.DOUBLE ); MetricAggregatorInfo pair2 = new MetricAggregatorInfo( MetricStat.SUM, "column1", "star_tree_field", - IndexNumericFieldData.NumericType.DOUBLE, - null + IndexNumericFieldData.NumericType.DOUBLE ); assertEquals(pair1, pair2); assertNotEquals( pair1, - new MetricAggregatorInfo(MetricStat.COUNT, "column1", "star_tree_field", IndexNumericFieldData.NumericType.DOUBLE, null) + new MetricAggregatorInfo(MetricStat.COUNT, "column1", "star_tree_field", IndexNumericFieldData.NumericType.DOUBLE) ); assertNotEquals( pair1, - new MetricAggregatorInfo(MetricStat.SUM, "column2", "star_tree_field", IndexNumericFieldData.NumericType.DOUBLE, null) + new MetricAggregatorInfo(MetricStat.SUM, "column2", "star_tree_field", IndexNumericFieldData.NumericType.DOUBLE) ); } @@ -80,15 +75,13 @@ public void testHashCode() { MetricStat.SUM, "column1", "star_tree_field", - IndexNumericFieldData.NumericType.DOUBLE, - null + IndexNumericFieldData.NumericType.DOUBLE ); MetricAggregatorInfo pair2 = new MetricAggregatorInfo( MetricStat.SUM, "column1", "star_tree_field", - IndexNumericFieldData.NumericType.DOUBLE, - null + IndexNumericFieldData.NumericType.DOUBLE ); assertEquals(pair1.hashCode(), pair2.hashCode()); } @@ -98,22 +91,19 @@ public void testCompareTo() { MetricStat.SUM, "column1", "star_tree_field", - IndexNumericFieldData.NumericType.DOUBLE, - null + IndexNumericFieldData.NumericType.DOUBLE ); MetricAggregatorInfo pair2 = new MetricAggregatorInfo( MetricStat.SUM, "column2", "star_tree_field", - IndexNumericFieldData.NumericType.DOUBLE, - null + IndexNumericFieldData.NumericType.DOUBLE ); MetricAggregatorInfo pair3 = new MetricAggregatorInfo( MetricStat.COUNT, "column1", "star_tree_field", - IndexNumericFieldData.NumericType.DOUBLE, - null + IndexNumericFieldData.NumericType.DOUBLE ); assertTrue(pair1.compareTo(pair2) < 0); assertTrue(pair2.compareTo(pair1) > 0); diff --git a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilderTests.java b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilderTests.java index 10f4f41b27f6f..ee96c45be931b 100644 --- a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilderTests.java +++ b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilderTests.java @@ -23,6 +23,7 @@ import org.apache.lucene.util.InfoStream; import org.apache.lucene.util.Version; import org.opensearch.common.settings.Settings; +import org.opensearch.index.codec.composite.datacube.startree.StarTreeValues; import org.opensearch.index.compositeindex.datacube.Dimension; import org.opensearch.index.compositeindex.datacube.Metric; import org.opensearch.index.compositeindex.datacube.MetricStat; @@ -31,6 +32,7 @@ import org.opensearch.index.compositeindex.datacube.startree.StarTreeField; import org.opensearch.index.compositeindex.datacube.startree.StarTreeFieldConfiguration; import org.opensearch.index.compositeindex.datacube.startree.aggregators.MetricAggregatorInfo; +import org.opensearch.index.compositeindex.datacube.startree.utils.SequentialDocValuesIterator; import org.opensearch.index.fielddata.IndexNumericFieldData; import org.opensearch.index.mapper.ContentPath; import org.opensearch.index.mapper.DocumentMapper; @@ -156,7 +158,10 @@ public static void setup() throws IOException { ); when(documentMapper.mappers()).thenReturn(fieldMappers); - builder = new BaseStarTreeBuilder(starTreeField, fieldProducerMap, state, mapperService) { + builder = new BaseStarTreeBuilder(starTreeField, state, mapperService) { + @Override + public void build(List starTreeValuesSubs) throws IOException {} + @Override public void appendStarTreeDocument(StarTreeDocument starTreeDocument) throws IOException {} @@ -176,7 +181,11 @@ public long getDimensionValue(int docId, int dimensionId) throws IOException { } @Override - public Iterator sortAndAggregateStarTreeDocuments(int numDocs) throws IOException { + public Iterator sortAndAggregateSegmentDocuments( + int numDocs, + SequentialDocValuesIterator[] dimensionReaders, + List metricReaders + ) throws IOException { return null; } @@ -188,11 +197,11 @@ public Iterator generateStarTreeDocumentsForStarNode(int start }; } - public void test_generateMetricAggregatorInfos() throws IOException { - List metricAggregatorInfos = builder.generateMetricAggregatorInfos(mapperService, state); + public void test_generateMetricAggregatorInfos() { + List metricAggregatorInfos = builder.generateMetricAggregatorInfos(mapperService); List expectedMetricAggregatorInfos = List.of( - new MetricAggregatorInfo(MetricStat.SUM, "field2", starTreeField.getName(), IndexNumericFieldData.NumericType.DOUBLE, null), - new MetricAggregatorInfo(MetricStat.SUM, "field4", starTreeField.getName(), IndexNumericFieldData.NumericType.DOUBLE, null) + new MetricAggregatorInfo(MetricStat.SUM, "field2", starTreeField.getName(), IndexNumericFieldData.NumericType.DOUBLE), + new MetricAggregatorInfo(MetricStat.SUM, "field4", starTreeField.getName(), IndexNumericFieldData.NumericType.DOUBLE) ); assertEquals(metricAggregatorInfos, expectedMetricAggregatorInfos); } diff --git a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilderTests.java b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilderTests.java index 146fd97ce5ee4..e70ccae682e16 100644 --- a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilderTests.java +++ b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilderTests.java @@ -16,14 +16,17 @@ import org.apache.lucene.index.IndexOptions; import org.apache.lucene.index.SegmentInfo; import org.apache.lucene.index.SegmentWriteState; +import org.apache.lucene.index.SortedNumericDocValues; import org.apache.lucene.index.VectorEncoding; import org.apache.lucene.index.VectorSimilarityFunction; import org.apache.lucene.sandbox.document.HalfFloatPoint; +import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.store.Directory; import org.apache.lucene.util.InfoStream; import org.apache.lucene.util.NumericUtils; import org.apache.lucene.util.Version; import org.opensearch.common.settings.Settings; +import org.opensearch.index.codec.composite.datacube.startree.StarTreeValues; import org.opensearch.index.compositeindex.datacube.Dimension; import org.opensearch.index.compositeindex.datacube.Metric; import org.opensearch.index.compositeindex.datacube.MetricStat; @@ -44,6 +47,7 @@ import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -160,7 +164,7 @@ public void setup() throws IOException { null ); when(documentMapper.mappers()).thenReturn(fieldMappers); - builder = new OnHeapStarTreeBuilder(compositeField, fieldProducerMap, writeState, mapperService); + builder = new OnHeapStarTreeBuilder(compositeField, writeState, mapperService); } public void test_sortAndAggregateStarTreeDocuments() throws IOException { @@ -361,7 +365,7 @@ public void test_build_halfFloatMetrics() throws IOException { null ); when(documentMapper.mappers()).thenReturn(fieldMappers); - builder = new OnHeapStarTreeBuilder(compositeField, fieldProducerMap, writeState, mapperService); + builder = new OnHeapStarTreeBuilder(compositeField, writeState, mapperService); int noOfStarTreeDocuments = 5; StarTreeDocument[] starTreeDocuments = new StarTreeDocument[noOfStarTreeDocuments]; @@ -403,7 +407,6 @@ public void test_build_halfFloatMetrics() throws IOException { Iterator segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments(segmentStarTreeDocuments); builder.build(segmentStarTreeDocumentIterator); - List resultStarTreeDocuments = builder.getStarTreeDocuments(); assertEquals(8, resultStarTreeDocuments.size()); @@ -431,7 +434,7 @@ public void test_build_floatMetrics() throws IOException { null ); when(documentMapper.mappers()).thenReturn(fieldMappers); - builder = new OnHeapStarTreeBuilder(compositeField, fieldProducerMap, writeState, mapperService); + builder = new OnHeapStarTreeBuilder(compositeField, writeState, mapperService); int noOfStarTreeDocuments = 5; StarTreeDocument[] starTreeDocuments = new StarTreeDocument[noOfStarTreeDocuments]; @@ -480,7 +483,7 @@ public void test_build_longMetrics() throws IOException { null ); when(documentMapper.mappers()).thenReturn(fieldMappers); - builder = new OnHeapStarTreeBuilder(compositeField, fieldProducerMap, writeState, mapperService); + builder = new OnHeapStarTreeBuilder(compositeField, writeState, mapperService); int noOfStarTreeDocuments = 5; StarTreeDocument[] starTreeDocuments = new StarTreeDocument[noOfStarTreeDocuments]; @@ -513,12 +516,12 @@ private static Iterator getExpectedStarTreeDocumentIterator() List expectedStarTreeDocuments = List.of( new StarTreeDocument(new Long[] { 2L, 4L, 3L, 4L }, new Object[] { 21.0, 14.0, 2L }), new StarTreeDocument(new Long[] { 3L, 4L, 2L, 1L }, new Object[] { 35.0, 34.0, 3L }), - new StarTreeDocument(new Long[] { -1L, 4L, 2L, 1L }, new Object[] { 35.0, 34.0, 3L }), - new StarTreeDocument(new Long[] { -1L, 4L, 3L, 4L }, new Object[] { 21.0, 14.0, 2L }), - new StarTreeDocument(new Long[] { -1L, 4L, -1L, 1L }, new Object[] { 35.0, 34.0, 3L }), - new StarTreeDocument(new Long[] { -1L, 4L, -1L, 4L }, new Object[] { 21.0, 14.0, 2L }), - new StarTreeDocument(new Long[] { -1L, 4L, -1L, -1L }, new Object[] { 56.0, 48.0, 5L }), - new StarTreeDocument(new Long[] { -1L, -1L, -1L, -1L }, new Object[] { 56.0, 48.0, 5L }) + new StarTreeDocument(new Long[] { null, 4L, 2L, 1L }, new Object[] { 35.0, 34.0, 3L }), + new StarTreeDocument(new Long[] { null, 4L, 3L, 4L }, new Object[] { 21.0, 14.0, 2L }), + new StarTreeDocument(new Long[] { null, 4L, null, 1L }, new Object[] { 35.0, 34.0, 3L }), + new StarTreeDocument(new Long[] { null, 4L, null, 4L }, new Object[] { 21.0, 14.0, 2L }), + new StarTreeDocument(new Long[] { null, 4L, null, null }, new Object[] { 56.0, 48.0, 5L }), + new StarTreeDocument(new Long[] { null, null, null, null }, new Object[] { 56.0, 48.0, 5L }) ); return expectedStarTreeDocuments.iterator(); } @@ -571,6 +574,118 @@ private static void assertStarTreeDocuments( } } + public void testMergeFlow() throws IOException { + List dimList = List.of(0L, 1L, 3L, 4L, 5L); + List docsWithField = List.of(0, 1, 3, 4, 5); + List dimList2 = List.of(0L, 1L, 2L, 3L, 4L, 5L); + List docsWithField2 = List.of(0, 1, 2, 3, 4, 5); + + List metricsList = List.of( + getLongFromDouble(0.0), + getLongFromDouble(10.0), + getLongFromDouble(20.0), + getLongFromDouble(30.0), + getLongFromDouble(40.0), + getLongFromDouble(50.0) + ); + List metricsWithField = List.of(0, 1, 2, 3, 4, 5); + + Dimension d1 = new NumericDimension("field1"); + Dimension d2 = new NumericDimension("field3"); + Metric m1 = new Metric("field2", List.of(MetricStat.SUM)); + List dims = List.of(d1, d2); + List metrics = List.of(m1); + StarTreeFieldConfiguration c = new StarTreeFieldConfiguration( + 1000, + new HashSet<>(), + StarTreeFieldConfiguration.StarTreeBuildMode.ON_HEAP + ); + StarTreeField sf = new StarTreeField("sf", dims, metrics, c); + SortedNumericDocValues d1sndv = getSortedNumericMock(dimList, docsWithField); + SortedNumericDocValues d2sndv = getSortedNumericMock(dimList2, docsWithField2); + SortedNumericDocValues m1sndv = getSortedNumericMock(metricsList, metricsWithField); + Map dimDocIdSetIterators = Map.of("field1", d1sndv, "field3", d2sndv); + Map metricDocIdSetIterators = Map.of("field2", m1sndv); + StarTreeValues starTreeValues = new StarTreeValues(sf, null, dimDocIdSetIterators, metricDocIdSetIterators); + + SortedNumericDocValues f2d1sndv = getSortedNumericMock(dimList, docsWithField); + SortedNumericDocValues f2d2sndv = getSortedNumericMock(dimList2, docsWithField2); + SortedNumericDocValues f2m1sndv = getSortedNumericMock(metricsList, metricsWithField); + Map f2dimDocIdSetIterators = Map.of("field1", f2d1sndv, "field3", f2d2sndv); + Map f2metricDocIdSetIterators = Map.of("field2", f2m1sndv); + StarTreeValues starTreeValues2 = new StarTreeValues(sf, null, f2dimDocIdSetIterators, f2metricDocIdSetIterators); + OnHeapStarTreeBuilder builder = new OnHeapStarTreeBuilder(sf, writeState, mapperService); + Iterator starTreeDocumentIterator = builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2)); + /** + * Asserting following dim / metrics [ dim1, dim2 / Sum [ metric] ] + * [0, 0] | [0.0] + * [1, 1] | [20.0] + * [3, 3] | [60.0] + * [4, 4] | [80.0] + * [5, 5] | [100.0] + * [null, 2] | [40.0] + */ + while (starTreeDocumentIterator.hasNext()) { + StarTreeDocument starTreeDocument = starTreeDocumentIterator.next(); + assertEquals( + starTreeDocument.dimensions[0] != null ? starTreeDocument.dimensions[0] * 2 * 10.0 : 40.0, + starTreeDocument.metrics[0] + ); + } + } + + private Long getLongFromDouble(Double num) { + if (num == null) { + return null; + } + return NumericUtils.doubleToSortableLong(num); + } + + private SortedNumericDocValues getSortedNumericMock(List dimList, List docsWithField) { + return new SortedNumericDocValues() { + int index = -1; + + @Override + public long nextValue() throws IOException { + return dimList.get(index); + } + + @Override + public int docValueCount() { + return 0; + } + + @Override + public boolean advanceExact(int target) throws IOException { + return false; + } + + @Override + public int docID() { + return index; + } + + @Override + public int nextDoc() throws IOException { + if (index == docsWithField.size() - 1) { + return NO_MORE_DOCS; + } + index++; + return docsWithField.get(index); + } + + @Override + public int advance(int target) throws IOException { + return 0; + } + + @Override + public long cost() { + return 0; + } + }; + } + @Override public void tearDown() throws Exception { super.tearDown(); diff --git a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreeValuesIteratorFactoryTests.java b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/SequentialIteratorTests.java similarity index 70% rename from server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreeValuesIteratorFactoryTests.java rename to server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/SequentialIteratorTests.java index 1aba67533d52e..8f9943a4e73ef 100644 --- a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreeValuesIteratorFactoryTests.java +++ b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/SequentialIteratorTests.java @@ -9,6 +9,7 @@ package org.opensearch.index.compositeindex.datacube.startree.builder; import org.apache.lucene.codecs.DocValuesProducer; +import org.apache.lucene.index.BinaryDocValues; import org.apache.lucene.index.DocValuesType; import org.apache.lucene.index.FieldInfo; import org.apache.lucene.index.IndexOptions; @@ -16,6 +17,7 @@ import org.apache.lucene.index.VectorEncoding; import org.apache.lucene.index.VectorSimilarityFunction; import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.util.BytesRef; import org.opensearch.index.compositeindex.datacube.startree.utils.SequentialDocValuesIterator; import org.opensearch.test.OpenSearchTestCase; import org.junit.BeforeClass; @@ -27,14 +29,12 @@ import static org.mockito.Mockito.when; -public class StarTreeValuesIteratorFactoryTests extends OpenSearchTestCase { +public class SequentialIteratorTests extends OpenSearchTestCase { - private static StarTreeDocValuesIteratorAdapter starTreeDocValuesIteratorAdapter; private static FieldInfo mockFieldInfo; @BeforeClass public static void setup() { - starTreeDocValuesIteratorAdapter = new StarTreeDocValuesIteratorAdapter(); mockFieldInfo = new FieldInfo( "field", 1, @@ -60,20 +60,24 @@ public void testCreateIterator_SortedNumeric() throws IOException { DocValuesProducer producer = Mockito.mock(DocValuesProducer.class); SortedNumericDocValues iterator = Mockito.mock(SortedNumericDocValues.class); when(producer.getSortedNumeric(mockFieldInfo)).thenReturn(iterator); - SequentialDocValuesIterator result = starTreeDocValuesIteratorAdapter.getDocValuesIterator( - DocValuesType.SORTED_NUMERIC, - mockFieldInfo, - producer - ); + SequentialDocValuesIterator result = new SequentialDocValuesIterator(producer.getSortedNumeric(mockFieldInfo)); assertEquals(iterator.getClass(), result.getDocIdSetIterator().getClass()); } - public void testCreateIterator_UnsupportedType() { + public void testCreateIterator_UnsupportedType() throws IOException { DocValuesProducer producer = Mockito.mock(DocValuesProducer.class); - IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> { - starTreeDocValuesIteratorAdapter.getDocValuesIterator(DocValuesType.BINARY, mockFieldInfo, producer); + BinaryDocValues iterator = Mockito.mock(BinaryDocValues.class); + when(producer.getBinary(mockFieldInfo)).thenReturn(iterator); + SequentialDocValuesIterator result = new SequentialDocValuesIterator(producer.getBinary(mockFieldInfo)); + assertEquals(iterator.getClass(), result.getDocIdSetIterator().getClass()); + when(iterator.nextDoc()).thenReturn(0); + when(iterator.binaryValue()).thenReturn(new BytesRef("123")); + + IllegalStateException exception = expectThrows(IllegalStateException.class, () -> { + result.nextDoc(0); + result.value(0); }); - assertEquals("Unsupported DocValuesType: BINARY", exception.getMessage()); + assertEquals("Unsupported Iterator requested for SequentialDocValuesIterator", exception.getMessage()); } public void testGetNextValue_SortedNumeric() throws IOException { @@ -81,8 +85,8 @@ public void testGetNextValue_SortedNumeric() throws IOException { when(iterator.nextDoc()).thenReturn(0); when(iterator.nextValue()).thenReturn(123L); SequentialDocValuesIterator sequentialDocValuesIterator = new SequentialDocValuesIterator(iterator); - sequentialDocValuesIterator.getDocIdSetIterator().nextDoc(); - long result = starTreeDocValuesIteratorAdapter.getNextValue(sequentialDocValuesIterator, 0); + sequentialDocValuesIterator.nextDoc(0); + long result = sequentialDocValuesIterator.value(0); assertEquals(123L, result); } @@ -90,10 +94,8 @@ public void testGetNextValue_UnsupportedIterator() { DocIdSetIterator iterator = Mockito.mock(DocIdSetIterator.class); SequentialDocValuesIterator sequentialDocValuesIterator = new SequentialDocValuesIterator(iterator); - IllegalStateException exception = expectThrows(IllegalStateException.class, () -> { - starTreeDocValuesIteratorAdapter.getNextValue(sequentialDocValuesIterator, 0); - }); - assertEquals("Unsupported Iterator: " + iterator.toString(), exception.getMessage()); + IllegalStateException exception = expectThrows(IllegalStateException.class, () -> { sequentialDocValuesIterator.value(0); }); + assertEquals("Unsupported Iterator requested for SequentialDocValuesIterator", exception.getMessage()); } public void testNextDoc() throws IOException { @@ -101,7 +103,7 @@ public void testNextDoc() throws IOException { SequentialDocValuesIterator sequentialDocValuesIterator = new SequentialDocValuesIterator(iterator); when(iterator.nextDoc()).thenReturn(5); - int result = starTreeDocValuesIteratorAdapter.nextDoc(sequentialDocValuesIterator, 5); + int result = sequentialDocValuesIterator.nextDoc(5); assertEquals(5, result); } @@ -118,13 +120,13 @@ public void test_multipleCoordinatedDocumentReader() throws IOException { when(iterator1.nextValue()).thenReturn(9L); when(iterator2.nextValue()).thenReturn(9L); - starTreeDocValuesIteratorAdapter.nextDoc(sequentialDocValuesIterator1, 0); - starTreeDocValuesIteratorAdapter.nextDoc(sequentialDocValuesIterator2, 0); + sequentialDocValuesIterator1.nextDoc(0); + sequentialDocValuesIterator2.nextDoc(0); assertEquals(0, sequentialDocValuesIterator1.getDocId()); - assertEquals(9L, (long) sequentialDocValuesIterator1.getDocValue()); + assertEquals(9L, (long) sequentialDocValuesIterator1.value(0)); assertNotEquals(0, sequentialDocValuesIterator2.getDocId()); assertEquals(1, sequentialDocValuesIterator2.getDocId()); - assertEquals(9L, (long) sequentialDocValuesIterator2.getDocValue()); + assertEquals(9L, (long) sequentialDocValuesIterator2.value(1)); }