From faf2a1f0fad3c8d36df7f1b6d9f95fd02919eed2 Mon Sep 17 00:00:00 2001 From: Sarthak Aggarwal Date: Wed, 17 Jul 2024 19:26:09 +0530 Subject: [PATCH] improvements and fixes Signed-off-by: Sarthak Aggarwal --- .../SortedNumericDocValuesWriterHelper.java | 3 +- .../composite/Composite99DocValuesReader.java | 5 +-- .../composite/Composite99DocValuesWriter.java | 3 +- .../aggregators/CountValueAggregator.java | 5 +++ .../aggregators/MaxValueAggregator.java | 5 +++ .../aggregators/MinValueAggregator.java | 5 +++ .../aggregators/SumValueAggregator.java | 5 +++ .../startree/aggregators/ValueAggregator.java | 5 +++ .../startree/builder/BaseStarTreeBuilder.java | 32 +++++-------- .../startree/node/OffHeapStarTree.java | 5 +-- .../utils/StarTreeDataSerializer.java | 3 +- .../StarTreeDocValuesFormatTests.java | 9 ++-- .../builder/OnHeapStarTreeBuilderTests.java | 45 +++++++++++++++---- 13 files changed, 84 insertions(+), 46 deletions(-) diff --git a/server/src/main/java/org/apache/lucene/index/SortedNumericDocValuesWriterHelper.java b/server/src/main/java/org/apache/lucene/index/SortedNumericDocValuesWriterHelper.java index df489c3f27442..78a89d9179c25 100644 --- a/server/src/main/java/org/apache/lucene/index/SortedNumericDocValuesWriterHelper.java +++ b/server/src/main/java/org/apache/lucene/index/SortedNumericDocValuesWriterHelper.java @@ -28,11 +28,10 @@ public class SortedNumericDocValuesWriterHelper { * @param fieldInfo the field information for the field being written * @param counter a counter for tracking memory usage */ - public SortedNumericDocValuesWriterHelper(FieldInfo fieldInfo, Counter counter){ + public SortedNumericDocValuesWriterHelper(FieldInfo fieldInfo, Counter counter) { sortedNumericDocValuesWriter = new SortedNumericDocValuesWriter(fieldInfo, counter); } - /** * Adds a value to the sorted numeric doc values for the specified document. * diff --git a/server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesReader.java b/server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesReader.java index 2f032aa8903ec..6d0aad5be6c13 100644 --- a/server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesReader.java +++ b/server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesReader.java @@ -115,7 +115,7 @@ public Composite99DocValuesReader(DocValuesProducer producer, SegmentReadState r while (true) { - //validate magic marker + // validate magic marker long magicMarker = metaIn.readLong(); if (magicMarker == -1) { logger.info("EOF reached for composite index metadata"); @@ -236,8 +236,7 @@ public List getCompositeIndexFields() { } @Override - public CompositeIndexValues getCompositeIndexValues(CompositeIndexFieldInfo compositeIndexFieldInfo) throws - IOException { + public CompositeIndexValues getCompositeIndexValues(CompositeIndexFieldInfo compositeIndexFieldInfo) throws IOException { switch (compositeIndexFieldInfo.getType()) { case STAR_TREE: diff --git a/server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesWriter.java b/server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesWriter.java index c3a924b34bc98..86eaf96c454af 100644 --- a/server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesWriter.java +++ b/server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesWriter.java @@ -59,7 +59,8 @@ public class Composite99DocValuesWriter extends DocValuesConsumer { private final Map fieldProducerMap = new HashMap<>(); private static final Logger logger = LogManager.getLogger(Composite99DocValuesWriter.class); - public Composite99DocValuesWriter(DocValuesConsumer delegate, SegmentWriteState segmentWriteState, MapperService mapperService) throws IOException { + public Composite99DocValuesWriter(DocValuesConsumer delegate, SegmentWriteState segmentWriteState, MapperService mapperService) + throws IOException { this.delegate = delegate; this.state = segmentWriteState; diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/CountValueAggregator.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/CountValueAggregator.java index bbf35448fbb1a..d7485b809f469 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/CountValueAggregator.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/CountValueAggregator.java @@ -63,4 +63,9 @@ public Long toLongValue(Long value) { public Long toStarTreeNumericTypeValue(Long value) { return value; } + + @Override + public long getIdempotentMetricValue() { + return 0L; + } } diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MaxValueAggregator.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MaxValueAggregator.java index 2c3571ae3893c..13fd64e024e24 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MaxValueAggregator.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MaxValueAggregator.java @@ -72,4 +72,9 @@ public Double toStarTreeNumericTypeValue(Long value) { throw new IllegalStateException("Cannot convert " + value + " to sortable aggregation type", e); } } + + @Override + public long getIdempotentMetricValue() { + return Long.MIN_VALUE; + } } diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MinValueAggregator.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MinValueAggregator.java index 8c8529ff130cc..0ca9e5d5b63e9 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MinValueAggregator.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/MinValueAggregator.java @@ -72,4 +72,9 @@ public Double toStarTreeNumericTypeValue(Long value) { throw new IllegalStateException("Cannot convert " + value + " to sortable aggregation type", e); } } + + @Override + public long getIdempotentMetricValue() { + return Long.MAX_VALUE; + } } diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/SumValueAggregator.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/SumValueAggregator.java index 6b8c412bdd461..8ad0d1e6e5baf 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/SumValueAggregator.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/SumValueAggregator.java @@ -94,4 +94,9 @@ public Double toStarTreeNumericTypeValue(Long value) { throw new IllegalStateException("Cannot convert " + value + " to sortable aggregation type", e); } } + + @Override + public long getIdempotentMetricValue() { + return 0; + } } diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/ValueAggregator.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/ValueAggregator.java index 5a3c7451c4c6a..21764ace79915 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/ValueAggregator.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/aggregators/ValueAggregator.java @@ -61,4 +61,9 @@ public interface ValueAggregator { * Converts an aggregated value from a Long type. */ A toStarTreeNumericTypeValue(Long rawValue); + + /** + * Fetches an value that does not alter the result of aggregations + */ + long getIdempotentMetricValue(); } diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilder.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilder.java index da538408cc03b..d5c2f8ef4c20a 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilder.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilder.java @@ -7,7 +7,6 @@ */ package org.opensearch.index.compositeindex.datacube.startree.builder; -import java.time.temporal.ChronoField; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.codecs.DocValuesConsumer; @@ -21,12 +20,10 @@ import org.apache.lucene.index.SortedNumericDocValuesWriterHelper; import org.apache.lucene.index.VectorEncoding; import org.apache.lucene.index.VectorSimilarityFunction; -import org.opensearch.common.time.DateUtils; import org.apache.lucene.store.IndexOutput; import org.apache.lucene.util.Counter; import org.apache.lucene.util.NumericUtils; -import org.apache.lucene.index.FieldInfo; -import org.apache.lucene.index.SegmentWriteState; +import org.opensearch.common.time.DateUtils; import org.opensearch.index.compositeindex.datacube.Dimension; import org.opensearch.index.compositeindex.datacube.Metric; import org.opensearch.index.compositeindex.datacube.MetricStat; @@ -45,6 +42,7 @@ import org.opensearch.index.mapper.NumberFieldMapper; import java.io.IOException; +import java.time.temporal.ChronoField; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -58,7 +56,6 @@ import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeHelper.fullFieldNameForStarTreeDimensionsDocValues; import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeHelper.fullFieldNameForStarTreeMetricsDocValues; - import static org.opensearch.index.compositeindex.datacube.startree.utils.TreeNode.ALL; /** @@ -220,10 +217,7 @@ public void build( fieldProducerMap.get(dimensionFieldInfo.name).getSortedNumeric(dimensionFieldInfo) ); } - Iterator starTreeDocumentIterator = sortAndAggregateSegmentDocuments( - dimensionReaders, - metricReaders - ); + Iterator starTreeDocumentIterator = sortAndAggregateSegmentDocuments(dimensionReaders, metricReaders); logger.debug("Sorting and aggregating star-tree in ms : {}", (System.currentTimeMillis() - startTime)); build(starTreeDocumentIterator, fieldNumberAcrossStarTrees, starTreeDocValuesConsumer); logger.debug("Finished Building star-tree in ms : {}", (System.currentTimeMillis() - startTime)); @@ -445,7 +439,6 @@ public SortedNumericDocValues getSortedNumeric(FieldInfo field) { */ public abstract StarTreeDocument getStarTreeDocument(int docId) throws IOException; - public abstract StarTreeDocument getStarTreeDocumentForCreatingDocValues(int docId) throws IOException; /** @@ -490,7 +483,6 @@ public abstract Iterator generateStarTreeDocumentsForStarNode( /** * Returns the star-tree document from the segment based on the current doc id - * */ protected StarTreeDocument getSegmentStarTreeDocument( int currentDocId, @@ -582,7 +574,7 @@ protected StarTreeDocument reduceSegmentStarTreeDocuments( metrics[i] = metricValueAggregator.getInitialAggregatedValue(segmentDocument.metrics[i]); } else { metrics[i] = metricValueAggregator.getInitialAggregatedValueForSegmentDocValue( - getLong(segmentDocument.metrics[i]), + getLong(segmentDocument.metrics[i], metricValueAggregator.getIdempotentMetricValue()), starTreeNumericType ); } @@ -606,7 +598,7 @@ protected StarTreeDocument reduceSegmentStarTreeDocuments( } else { aggregatedSegmentDocument.metrics[i] = metricValueAggregator.mergeAggregatedValueAndSegmentValue( aggregatedSegmentDocument.metrics[i], - getLong(segmentDocument.metrics[i]), + getLong(segmentDocument.metrics[i], metricValueAggregator.getIdempotentMetricValue()), starTreeNumericType ); } @@ -622,25 +614,26 @@ protected StarTreeDocument reduceSegmentStarTreeDocuments( /** * Safely converts the metric value of object type to long. * - * @param metric value of the metric + * @param metric value of the metric + * @param idempotentMetricValue * @return converted metric value to long */ - private static long getLong(Object metric) { + private static long getLong(Object metric, long idempotentMetricValue) { - Long metricValue = null; + Long metricValue; try { if (metric instanceof Long) { metricValue = (long) metric; } else if (metric != null) { metricValue = Long.valueOf(String.valueOf(metric)); + } else { + logger.debug("metric value is null, returning idempotent metric value for the aggregator"); + return idempotentMetricValue; } } catch (Exception e) { throw new IllegalStateException("unable to cast segment metric", e); } - if (metricValue == null) { - throw new IllegalStateException("unable to cast segment metric"); - } return metricValue; } @@ -688,7 +681,6 @@ public StarTreeDocument reduceStarTreeDocuments(StarTreeDocument aggregatedDocum * Builds the star tree from the original segment documents * * @param fieldProducerMap contain s the docValues producer to get docValues associated with each field - * * @throws IOException when we are unable to build star-tree */ public void build(Map fieldProducerMap) throws IOException { diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/OffHeapStarTree.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/OffHeapStarTree.java index cfc280643d202..c34cdc0b45ea4 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/OffHeapStarTree.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/node/OffHeapStarTree.java @@ -41,10 +41,7 @@ public OffHeapStarTree(IndexInput data, StarTreeMetadata starTreeMetadata) throw } numNodes = data.readInt(); // num nodes - RandomAccessInput in = data.randomAccessSlice( - data.getFilePointer(), - starTreeMetadata.getDataLength() - ); + RandomAccessInput in = data.randomAccessSlice(data.getFilePointer(), starTreeMetadata.getDataLength()); root = new OffHeapStarTreeNode(in, 0); } diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeDataSerializer.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeDataSerializer.java index 898234cdacc86..a4c3e377125d4 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeDataSerializer.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/utils/StarTreeDataSerializer.java @@ -123,8 +123,7 @@ private static void writeStarTreeNodes(IndexOutput output, TreeNode rootNode) th * @param lastChildId the ID of the last child node * @throws IOException if an I/O error occurs while writing the node */ - private static void writeStarTreeNode(IndexOutput output, TreeNode node, int firstChildId, int lastChildId) - throws IOException { + private static void writeStarTreeNode(IndexOutput output, TreeNode node, int firstChildId, int lastChildId) throws IOException { output.writeInt(node.dimensionId); output.writeLong(node.dimensionValue); output.writeInt(node.startDocId); diff --git a/server/src/test/java/org/opensearch/index/codec/composite/datacube/startree/StarTreeDocValuesFormatTests.java b/server/src/test/java/org/opensearch/index/codec/composite/datacube/startree/StarTreeDocValuesFormatTests.java index 4d07d6afe0323..e7ec9f7fd107d 100644 --- a/server/src/test/java/org/opensearch/index/codec/composite/datacube/startree/StarTreeDocValuesFormatTests.java +++ b/server/src/test/java/org/opensearch/index/codec/composite/datacube/startree/StarTreeDocValuesFormatTests.java @@ -31,7 +31,11 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.index.MapperTestUtils; import org.opensearch.index.codec.composite.Composite99Codec; +import org.opensearch.index.compositeindex.datacube.DateDimension; +import org.opensearch.index.compositeindex.datacube.Dimension; +import org.opensearch.index.compositeindex.datacube.Metric; import org.opensearch.index.compositeindex.datacube.MetricStat; +import org.opensearch.index.compositeindex.datacube.NumericDimension; import org.opensearch.index.compositeindex.datacube.startree.StarTreeField; import org.opensearch.index.compositeindex.datacube.startree.StarTreeFieldConfiguration; import org.opensearch.index.mapper.MapperService; @@ -41,11 +45,6 @@ import org.junit.AfterClass; import org.junit.BeforeClass; -import org.opensearch.index.compositeindex.datacube.Metric; -import org.opensearch.index.compositeindex.datacube.Dimension; -import org.opensearch.index.compositeindex.datacube.DateDimension; -import org.opensearch.index.compositeindex.datacube.NumericDimension; - import java.io.IOException; import java.util.ArrayList; import java.util.Collections; diff --git a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilderTests.java b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilderTests.java index d79d2ced98492..7c891f1780f8f 100644 --- a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilderTests.java +++ b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilderTests.java @@ -214,7 +214,10 @@ public void test_sortAndAggregateStarTreeDocuments() throws IOException { segmentStarTreeDocuments[i] = new StarTreeDocument(starTreeDocuments[i].dimensions, metrics); } - Iterator segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments(segmentStarTreeDocuments, false); + Iterator segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments( + segmentStarTreeDocuments, + false + ); int numOfAggregatedDocuments = 0; while (segmentStarTreeDocumentIterator.hasNext() && expectedStarTreeDocumentIterator.hasNext()) { StarTreeDocument resultStarTreeDocument = segmentStarTreeDocumentIterator.next(); @@ -258,7 +261,10 @@ public void test_sortAndAggregateStarTreeDocuments_nullMetric() throws IOExcepti segmentStarTreeDocuments[i] = new StarTreeDocument(starTreeDocuments[i].dimensions, metrics); } - Iterator segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments(segmentStarTreeDocuments, false); + Iterator segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments( + segmentStarTreeDocuments, + false + ); StarTreeDocument resultStarTreeDocument = segmentStarTreeDocumentIterator.next(); for (int dim = 0; dim < 4; dim++) { @@ -318,7 +324,10 @@ public void test_sortAndAggregateStarTreeDocument_longMaxAndLongMinDimensions() segmentStarTreeDocuments[i] = new StarTreeDocument(starTreeDocuments[i].dimensions, metrics); } - Iterator segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments(segmentStarTreeDocuments, false); + Iterator segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments( + segmentStarTreeDocuments, + false + ); int numOfAggregatedDocuments = 0; while (segmentStarTreeDocumentIterator.hasNext() && expectedStarTreeDocumentIterator.hasNext()) { StarTreeDocument resultStarTreeDocument = segmentStarTreeDocumentIterator.next(); @@ -371,7 +380,10 @@ public void test_sortAndAggregateStarTreeDocument_DoubleMaxAndDoubleMinMetrics() segmentStarTreeDocuments[i] = new StarTreeDocument(starTreeDocuments[i].dimensions, metrics); } - Iterator segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments(segmentStarTreeDocuments, false); + Iterator segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments( + segmentStarTreeDocuments, + false + ); int numOfAggregatedDocuments = 0; while (segmentStarTreeDocumentIterator.hasNext() && expectedStarTreeDocumentIterator.hasNext()) { StarTreeDocument resultStarTreeDocument = segmentStarTreeDocumentIterator.next(); @@ -482,7 +494,10 @@ public void test_build_halfFloatMetrics() throws IOException { segmentStarTreeDocuments[i] = new StarTreeDocument(starTreeDocuments[i].dimensions, metrics); } - Iterator segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments(segmentStarTreeDocuments, false); + Iterator segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments( + segmentStarTreeDocuments, + false + ); builder.build(segmentStarTreeDocumentIterator, new AtomicInteger(), docValuesConsumer); List resultStarTreeDocuments = builder.getStarTreeDocuments(); assertEquals(7, resultStarTreeDocuments.size()); @@ -547,7 +562,10 @@ public void test_build_floatMetrics() throws IOException { segmentStarTreeDocuments[i] = new StarTreeDocument(starTreeDocuments[i].dimensions, metrics); } - Iterator segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments(segmentStarTreeDocuments, false); + Iterator segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments( + segmentStarTreeDocuments, + false + ); builder.build(segmentStarTreeDocumentIterator, new AtomicInteger(), docValuesConsumer); List resultStarTreeDocuments = builder.getStarTreeDocuments(); @@ -601,7 +619,10 @@ public void test_build_longMetrics() throws IOException { segmentStarTreeDocuments[i] = new StarTreeDocument(starTreeDocuments[i].dimensions, metrics); } - Iterator segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments(segmentStarTreeDocuments, false); + Iterator segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments( + segmentStarTreeDocuments, + false + ); builder.build(segmentStarTreeDocumentIterator, new AtomicInteger(), docValuesConsumer); List resultStarTreeDocuments = builder.getStarTreeDocuments(); @@ -644,7 +665,10 @@ public void test_build() throws IOException { segmentStarTreeDocuments[i] = new StarTreeDocument(starTreeDocuments[i].dimensions, metrics); } - Iterator segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments(segmentStarTreeDocuments, false); + Iterator segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments( + segmentStarTreeDocuments, + false + ); builder.build(segmentStarTreeDocumentIterator, new AtomicInteger(), docValuesConsumer); List resultStarTreeDocuments = builder.getStarTreeDocuments(); @@ -764,7 +788,10 @@ public void test_build_starTreeDataset() throws IOException { segmentStarTreeDocuments[i] = new StarTreeDocument(starTreeDocuments[i].dimensions, new Long[] { metric1 }); } - Iterator segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments(segmentStarTreeDocuments, false); + Iterator segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments( + segmentStarTreeDocuments, + false + ); builder.build(segmentStarTreeDocumentIterator); List resultStarTreeDocuments = builder.getStarTreeDocuments();