Skip to content

Commit

Permalink
improvements and fixes
Browse files Browse the repository at this point in the history
Signed-off-by: Sarthak Aggarwal <[email protected]>
  • Loading branch information
sarthakaggarwal97 committed Jul 18, 2024
1 parent d1d6c85 commit faf2a1f
Show file tree
Hide file tree
Showing 13 changed files with 84 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,10 @@ public class SortedNumericDocValuesWriterHelper {
* @param fieldInfo the field information for the field being written
* @param counter a counter for tracking memory usage
*/
public SortedNumericDocValuesWriterHelper(FieldInfo fieldInfo, Counter counter){
public SortedNumericDocValuesWriterHelper(FieldInfo fieldInfo, Counter counter) {
sortedNumericDocValuesWriter = new SortedNumericDocValuesWriter(fieldInfo, counter);
}


/**
* Adds a value to the sorted numeric doc values for the specified document.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public Composite99DocValuesReader(DocValuesProducer producer, SegmentReadState r

while (true) {

//validate magic marker
// validate magic marker
long magicMarker = metaIn.readLong();
if (magicMarker == -1) {
logger.info("EOF reached for composite index metadata");
Expand Down Expand Up @@ -236,8 +236,7 @@ public List<CompositeIndexFieldInfo> getCompositeIndexFields() {
}

@Override
public CompositeIndexValues getCompositeIndexValues(CompositeIndexFieldInfo compositeIndexFieldInfo) throws
IOException {
public CompositeIndexValues getCompositeIndexValues(CompositeIndexFieldInfo compositeIndexFieldInfo) throws IOException {

switch (compositeIndexFieldInfo.getType()) {
case STAR_TREE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public class Composite99DocValuesWriter extends DocValuesConsumer {
private final Map<String, DocValuesProducer> fieldProducerMap = new HashMap<>();
private static final Logger logger = LogManager.getLogger(Composite99DocValuesWriter.class);

public Composite99DocValuesWriter(DocValuesConsumer delegate, SegmentWriteState segmentWriteState, MapperService mapperService) throws IOException {
public Composite99DocValuesWriter(DocValuesConsumer delegate, SegmentWriteState segmentWriteState, MapperService mapperService)
throws IOException {

this.delegate = delegate;
this.state = segmentWriteState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,9 @@ public Long toLongValue(Long value) {
public Long toStarTreeNumericTypeValue(Long value) {
return value;
}

@Override
public long getIdempotentMetricValue() {
return 0L;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,9 @@ public Double toStarTreeNumericTypeValue(Long value) {
throw new IllegalStateException("Cannot convert " + value + " to sortable aggregation type", e);
}
}

@Override
public long getIdempotentMetricValue() {
return Long.MIN_VALUE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,9 @@ public Double toStarTreeNumericTypeValue(Long value) {
throw new IllegalStateException("Cannot convert " + value + " to sortable aggregation type", e);
}
}

@Override
public long getIdempotentMetricValue() {
return Long.MAX_VALUE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,9 @@ public Double toStarTreeNumericTypeValue(Long value) {
throw new IllegalStateException("Cannot convert " + value + " to sortable aggregation type", e);
}
}

@Override
public long getIdempotentMetricValue() {
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,9 @@ public interface ValueAggregator<A> {
* Converts an aggregated value from a Long type.
*/
A toStarTreeNumericTypeValue(Long rawValue);

/**
* Fetches an value that does not alter the result of aggregations
*/
long getIdempotentMetricValue();
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
*/
package org.opensearch.index.compositeindex.datacube.startree.builder;

import java.time.temporal.ChronoField;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.DocValuesConsumer;
Expand All @@ -21,12 +20,10 @@
import org.apache.lucene.index.SortedNumericDocValuesWriterHelper;
import org.apache.lucene.index.VectorEncoding;
import org.apache.lucene.index.VectorSimilarityFunction;
import org.opensearch.common.time.DateUtils;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.Counter;
import org.apache.lucene.util.NumericUtils;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.SegmentWriteState;
import org.opensearch.common.time.DateUtils;
import org.opensearch.index.compositeindex.datacube.Dimension;
import org.opensearch.index.compositeindex.datacube.Metric;
import org.opensearch.index.compositeindex.datacube.MetricStat;
Expand All @@ -45,6 +42,7 @@
import org.opensearch.index.mapper.NumberFieldMapper;

import java.io.IOException;
import java.time.temporal.ChronoField;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -58,7 +56,6 @@

import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeHelper.fullFieldNameForStarTreeDimensionsDocValues;
import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeHelper.fullFieldNameForStarTreeMetricsDocValues;

import static org.opensearch.index.compositeindex.datacube.startree.utils.TreeNode.ALL;

/**
Expand Down Expand Up @@ -220,10 +217,7 @@ public void build(
fieldProducerMap.get(dimensionFieldInfo.name).getSortedNumeric(dimensionFieldInfo)
);
}
Iterator<StarTreeDocument> starTreeDocumentIterator = sortAndAggregateSegmentDocuments(
dimensionReaders,
metricReaders
);
Iterator<StarTreeDocument> starTreeDocumentIterator = sortAndAggregateSegmentDocuments(dimensionReaders, metricReaders);
logger.debug("Sorting and aggregating star-tree in ms : {}", (System.currentTimeMillis() - startTime));
build(starTreeDocumentIterator, fieldNumberAcrossStarTrees, starTreeDocValuesConsumer);
logger.debug("Finished Building star-tree in ms : {}", (System.currentTimeMillis() - startTime));
Expand Down Expand Up @@ -445,7 +439,6 @@ public SortedNumericDocValues getSortedNumeric(FieldInfo field) {
*/
public abstract StarTreeDocument getStarTreeDocument(int docId) throws IOException;


public abstract StarTreeDocument getStarTreeDocumentForCreatingDocValues(int docId) throws IOException;

/**
Expand Down Expand Up @@ -490,7 +483,6 @@ public abstract Iterator<StarTreeDocument> generateStarTreeDocumentsForStarNode(

/**
* Returns the star-tree document from the segment based on the current doc id
*
*/
protected StarTreeDocument getSegmentStarTreeDocument(
int currentDocId,
Expand Down Expand Up @@ -582,7 +574,7 @@ protected StarTreeDocument reduceSegmentStarTreeDocuments(
metrics[i] = metricValueAggregator.getInitialAggregatedValue(segmentDocument.metrics[i]);
} else {
metrics[i] = metricValueAggregator.getInitialAggregatedValueForSegmentDocValue(
getLong(segmentDocument.metrics[i]),
getLong(segmentDocument.metrics[i], metricValueAggregator.getIdempotentMetricValue()),
starTreeNumericType
);
}
Expand All @@ -606,7 +598,7 @@ protected StarTreeDocument reduceSegmentStarTreeDocuments(
} else {
aggregatedSegmentDocument.metrics[i] = metricValueAggregator.mergeAggregatedValueAndSegmentValue(
aggregatedSegmentDocument.metrics[i],
getLong(segmentDocument.metrics[i]),
getLong(segmentDocument.metrics[i], metricValueAggregator.getIdempotentMetricValue()),
starTreeNumericType
);
}
Expand All @@ -622,25 +614,26 @@ protected StarTreeDocument reduceSegmentStarTreeDocuments(
/**
* Safely converts the metric value of object type to long.
*
* @param metric value of the metric
* @param metric value of the metric
* @param idempotentMetricValue
* @return converted metric value to long
*/
private static long getLong(Object metric) {
private static long getLong(Object metric, long idempotentMetricValue) {

Long metricValue = null;
Long metricValue;
try {
if (metric instanceof Long) {
metricValue = (long) metric;
} else if (metric != null) {
metricValue = Long.valueOf(String.valueOf(metric));
} else {
logger.debug("metric value is null, returning idempotent metric value for the aggregator");
return idempotentMetricValue;
}
} catch (Exception e) {
throw new IllegalStateException("unable to cast segment metric", e);
}

if (metricValue == null) {
throw new IllegalStateException("unable to cast segment metric");
}
return metricValue;
}

Expand Down Expand Up @@ -688,7 +681,6 @@ public StarTreeDocument reduceStarTreeDocuments(StarTreeDocument aggregatedDocum
* Builds the star tree from the original segment documents
*
* @param fieldProducerMap contain s the docValues producer to get docValues associated with each field
*
* @throws IOException when we are unable to build star-tree
*/
public void build(Map<String, DocValuesProducer> fieldProducerMap) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,7 @@ public OffHeapStarTree(IndexInput data, StarTreeMetadata starTreeMetadata) throw
}
numNodes = data.readInt(); // num nodes

RandomAccessInput in = data.randomAccessSlice(
data.getFilePointer(),
starTreeMetadata.getDataLength()
);
RandomAccessInput in = data.randomAccessSlice(data.getFilePointer(), starTreeMetadata.getDataLength());
root = new OffHeapStarTreeNode(in, 0);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,7 @@ private static void writeStarTreeNodes(IndexOutput output, TreeNode rootNode) th
* @param lastChildId the ID of the last child node
* @throws IOException if an I/O error occurs while writing the node
*/
private static void writeStarTreeNode(IndexOutput output, TreeNode node, int firstChildId, int lastChildId)
throws IOException {
private static void writeStarTreeNode(IndexOutput output, TreeNode node, int firstChildId, int lastChildId) throws IOException {
output.writeInt(node.dimensionId);
output.writeLong(node.dimensionValue);
output.writeInt(node.startDocId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.MapperTestUtils;
import org.opensearch.index.codec.composite.Composite99Codec;
import org.opensearch.index.compositeindex.datacube.DateDimension;
import org.opensearch.index.compositeindex.datacube.Dimension;
import org.opensearch.index.compositeindex.datacube.Metric;
import org.opensearch.index.compositeindex.datacube.MetricStat;
import org.opensearch.index.compositeindex.datacube.NumericDimension;
import org.opensearch.index.compositeindex.datacube.startree.StarTreeField;
import org.opensearch.index.compositeindex.datacube.startree.StarTreeFieldConfiguration;
import org.opensearch.index.mapper.MapperService;
Expand All @@ -41,11 +45,6 @@
import org.junit.AfterClass;
import org.junit.BeforeClass;

import org.opensearch.index.compositeindex.datacube.Metric;
import org.opensearch.index.compositeindex.datacube.Dimension;
import org.opensearch.index.compositeindex.datacube.DateDimension;
import org.opensearch.index.compositeindex.datacube.NumericDimension;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,10 @@ public void test_sortAndAggregateStarTreeDocuments() throws IOException {
segmentStarTreeDocuments[i] = new StarTreeDocument(starTreeDocuments[i].dimensions, metrics);
}

Iterator<StarTreeDocument> segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments(segmentStarTreeDocuments, false);
Iterator<StarTreeDocument> segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments(
segmentStarTreeDocuments,
false
);
int numOfAggregatedDocuments = 0;
while (segmentStarTreeDocumentIterator.hasNext() && expectedStarTreeDocumentIterator.hasNext()) {
StarTreeDocument resultStarTreeDocument = segmentStarTreeDocumentIterator.next();
Expand Down Expand Up @@ -258,7 +261,10 @@ public void test_sortAndAggregateStarTreeDocuments_nullMetric() throws IOExcepti
segmentStarTreeDocuments[i] = new StarTreeDocument(starTreeDocuments[i].dimensions, metrics);
}

Iterator<StarTreeDocument> segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments(segmentStarTreeDocuments, false);
Iterator<StarTreeDocument> segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments(
segmentStarTreeDocuments,
false
);

StarTreeDocument resultStarTreeDocument = segmentStarTreeDocumentIterator.next();
for (int dim = 0; dim < 4; dim++) {
Expand Down Expand Up @@ -318,7 +324,10 @@ public void test_sortAndAggregateStarTreeDocument_longMaxAndLongMinDimensions()
segmentStarTreeDocuments[i] = new StarTreeDocument(starTreeDocuments[i].dimensions, metrics);
}

Iterator<StarTreeDocument> segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments(segmentStarTreeDocuments, false);
Iterator<StarTreeDocument> segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments(
segmentStarTreeDocuments,
false
);
int numOfAggregatedDocuments = 0;
while (segmentStarTreeDocumentIterator.hasNext() && expectedStarTreeDocumentIterator.hasNext()) {
StarTreeDocument resultStarTreeDocument = segmentStarTreeDocumentIterator.next();
Expand Down Expand Up @@ -371,7 +380,10 @@ public void test_sortAndAggregateStarTreeDocument_DoubleMaxAndDoubleMinMetrics()
segmentStarTreeDocuments[i] = new StarTreeDocument(starTreeDocuments[i].dimensions, metrics);
}

Iterator<StarTreeDocument> segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments(segmentStarTreeDocuments, false);
Iterator<StarTreeDocument> segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments(
segmentStarTreeDocuments,
false
);
int numOfAggregatedDocuments = 0;
while (segmentStarTreeDocumentIterator.hasNext() && expectedStarTreeDocumentIterator.hasNext()) {
StarTreeDocument resultStarTreeDocument = segmentStarTreeDocumentIterator.next();
Expand Down Expand Up @@ -482,7 +494,10 @@ public void test_build_halfFloatMetrics() throws IOException {
segmentStarTreeDocuments[i] = new StarTreeDocument(starTreeDocuments[i].dimensions, metrics);
}

Iterator<StarTreeDocument> segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments(segmentStarTreeDocuments, false);
Iterator<StarTreeDocument> segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments(
segmentStarTreeDocuments,
false
);
builder.build(segmentStarTreeDocumentIterator, new AtomicInteger(), docValuesConsumer);
List<StarTreeDocument> resultStarTreeDocuments = builder.getStarTreeDocuments();
assertEquals(7, resultStarTreeDocuments.size());
Expand Down Expand Up @@ -547,7 +562,10 @@ public void test_build_floatMetrics() throws IOException {
segmentStarTreeDocuments[i] = new StarTreeDocument(starTreeDocuments[i].dimensions, metrics);
}

Iterator<StarTreeDocument> segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments(segmentStarTreeDocuments, false);
Iterator<StarTreeDocument> segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments(
segmentStarTreeDocuments,
false
);
builder.build(segmentStarTreeDocumentIterator, new AtomicInteger(), docValuesConsumer);

List<StarTreeDocument> resultStarTreeDocuments = builder.getStarTreeDocuments();
Expand Down Expand Up @@ -601,7 +619,10 @@ public void test_build_longMetrics() throws IOException {
segmentStarTreeDocuments[i] = new StarTreeDocument(starTreeDocuments[i].dimensions, metrics);
}

Iterator<StarTreeDocument> segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments(segmentStarTreeDocuments, false);
Iterator<StarTreeDocument> segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments(
segmentStarTreeDocuments,
false
);
builder.build(segmentStarTreeDocumentIterator, new AtomicInteger(), docValuesConsumer);

List<StarTreeDocument> resultStarTreeDocuments = builder.getStarTreeDocuments();
Expand Down Expand Up @@ -644,7 +665,10 @@ public void test_build() throws IOException {
segmentStarTreeDocuments[i] = new StarTreeDocument(starTreeDocuments[i].dimensions, metrics);
}

Iterator<StarTreeDocument> segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments(segmentStarTreeDocuments, false);
Iterator<StarTreeDocument> segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments(
segmentStarTreeDocuments,
false
);
builder.build(segmentStarTreeDocumentIterator, new AtomicInteger(), docValuesConsumer);

List<StarTreeDocument> resultStarTreeDocuments = builder.getStarTreeDocuments();
Expand Down Expand Up @@ -764,7 +788,10 @@ public void test_build_starTreeDataset() throws IOException {
segmentStarTreeDocuments[i] = new StarTreeDocument(starTreeDocuments[i].dimensions, new Long[] { metric1 });
}

Iterator<StarTreeDocument> segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments(segmentStarTreeDocuments, false);
Iterator<StarTreeDocument> segmentStarTreeDocumentIterator = builder.sortAndAggregateStarTreeDocuments(
segmentStarTreeDocuments,
false
);
builder.build(segmentStarTreeDocumentIterator);

List<StarTreeDocument> resultStarTreeDocuments = builder.getStarTreeDocuments();
Expand Down

0 comments on commit faf2a1f

Please sign in to comment.