Skip to content

Commit

Permalink
nits and tests
Browse files Browse the repository at this point in the history
Signed-off-by: Sarthak Aggarwal <[email protected]>
  • Loading branch information
sarthakaggarwal97 committed Jul 19, 2024
1 parent 9250579 commit 7ded5ca
Show file tree
Hide file tree
Showing 9 changed files with 415 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public Long toStarTreeNumericTypeValue(Long value) {
}

@Override
public long getIdempotentMetricValue() {
public long getIdentityMetricValue() {
return 0L;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public Double toStarTreeNumericTypeValue(Long value) {
}

@Override
public long getIdempotentMetricValue() {
public long getIdentityMetricValue() {
return Long.MIN_VALUE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public Double toStarTreeNumericTypeValue(Long value) {
}

@Override
public long getIdempotentMetricValue() {
public long getIdentityMetricValue() {
return Long.MAX_VALUE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public Double toStarTreeNumericTypeValue(Long value) {
}

@Override
public long getIdempotentMetricValue() {
public long getIdentityMetricValue() {
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,5 +65,5 @@ public interface ValueAggregator<A> {
/**
* Fetches a value that does not alter the result of aggregations
*/
long getIdempotentMetricValue();
long getIdentityMetricValue();
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.Counter;
import org.apache.lucene.util.NumericUtils;
import org.opensearch.common.time.DateUtils;
import org.opensearch.index.compositeindex.datacube.Dimension;
import org.opensearch.index.compositeindex.datacube.Metric;
import org.opensearch.index.compositeindex.datacube.MetricStat;
Expand All @@ -42,7 +41,6 @@
import org.opensearch.index.mapper.NumberFieldMapper;

import java.io.IOException;
import java.time.temporal.ChronoField;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -269,35 +267,6 @@ public void build(
serializeStarTree(numSegmentStarTreeDocument);
}

private long getTimeStampVal(final String fieldName, final long val) {
long roundedDate = 0;
long ratio = 0;

switch (fieldName) {

case "@timestamp":
ratio = ChronoField.MINUTE_OF_HOUR.getBaseUnit().getDuration().toMillis();
roundedDate = DateUtils.roundFloor(val, ratio);
return roundedDate;
case "hour":
ratio = ChronoField.HOUR_OF_DAY.getBaseUnit().getDuration().toMillis();
roundedDate = DateUtils.roundFloor(val, ratio);
return roundedDate;
case "day":
ratio = ChronoField.DAY_OF_MONTH.getBaseUnit().getDuration().toMillis();
roundedDate = DateUtils.roundFloor(val, ratio);
return roundedDate;
case "month":
roundedDate = DateUtils.roundMonthOfYear(val);
return roundedDate;
case "year":
roundedDate = DateUtils.roundYear(val);
return roundedDate;
default:
return val;
}
}

private void serializeStarTree(int numSegmentStarTreeDocument) throws IOException {
// serialize the star tree data
long dataFilePointer = dataOut.getFilePointer();
Expand Down Expand Up @@ -581,7 +550,7 @@ protected StarTreeDocument reduceSegmentStarTreeDocuments(
metrics[i] = metricValueAggregator.getInitialAggregatedValue(segmentDocument.metrics[i]);
} else {
metrics[i] = metricValueAggregator.getInitialAggregatedValueForSegmentDocValue(
getLong(segmentDocument.metrics[i], metricValueAggregator.getIdempotentMetricValue()),
getLong(segmentDocument.metrics[i], metricValueAggregator.getIdentityMetricValue()),
starTreeNumericType
);
}
Expand All @@ -605,7 +574,7 @@ protected StarTreeDocument reduceSegmentStarTreeDocuments(
} else {
aggregatedSegmentDocument.metrics[i] = metricValueAggregator.mergeAggregatedValueAndSegmentValue(
aggregatedSegmentDocument.metrics[i],
getLong(segmentDocument.metrics[i], metricValueAggregator.getIdempotentMetricValue()),
getLong(segmentDocument.metrics[i], metricValueAggregator.getIdentityMetricValue()),
starTreeNumericType
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,15 @@ public class StarTreeDataWriter {
* @throws IOException if an I/O error occurs while writing the star-tree data
*/
public static long serializeStarTree(IndexOutput indexOutput, TreeNode rootNode, int numNodes) throws IOException {
int headerSizeInBytes = computeStarTreeDataHeaderByteSize();
long totalSizeInBytes = headerSizeInBytes + (long) numNodes * SERIALIZABLE_DATA_SIZE_IN_BYTES;

logger.debug("Star tree size in bytes : {}", totalSizeInBytes);
long totalDataSizeInBytes = (long) numNodes * SERIALIZABLE_DATA_SIZE_IN_BYTES;
if (logger.isDebugEnabled()) {
int headerSizeInBytes = computeStarTreeDataHeaderByteSize();
logger.debug("Star tree size in bytes : {}", headerSizeInBytes + totalDataSizeInBytes);
}

writeStarTreeHeader(indexOutput, numNodes);
writeStarTreeNodes(indexOutput, rootNode);
return totalSizeInBytes;
return totalDataSizeInBytes;
}

/**
Expand Down Expand Up @@ -95,7 +96,7 @@ private static void writeStarTreeNodes(IndexOutput output, TreeNode rootNode) th
while (!queue.isEmpty()) {
TreeNode node = queue.remove();

if (node.children == null) {
if (node.children == null || node.children.isEmpty()) {
writeStarTreeNode(output, node, ALL, ALL);
} else {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.compositeindex.datacube.startree.meta;

import org.apache.lucene.codecs.lucene99.Lucene99Codec;
import org.apache.lucene.index.DocValuesType;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.FieldInfos;
import org.apache.lucene.index.IndexOptions;
import org.apache.lucene.index.SegmentInfo;
import org.apache.lucene.index.SegmentWriteState;
import org.apache.lucene.index.VectorEncoding;
import org.apache.lucene.index.VectorSimilarityFunction;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.InfoStream;
import org.apache.lucene.util.Version;
import org.opensearch.index.compositeindex.CompositeIndexMetadata;
import org.opensearch.index.compositeindex.datacube.Dimension;
import org.opensearch.index.compositeindex.datacube.Metric;
import org.opensearch.index.compositeindex.datacube.MetricStat;
import org.opensearch.index.compositeindex.datacube.NumericDimension;
import org.opensearch.index.compositeindex.datacube.startree.StarTreeField;
import org.opensearch.index.compositeindex.datacube.startree.StarTreeFieldConfiguration;
import org.opensearch.index.compositeindex.datacube.startree.aggregators.MetricAggregatorInfo;
import org.opensearch.index.compositeindex.datacube.startree.aggregators.MetricEntry;
import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeBuilderUtils;
import org.opensearch.index.fielddata.IndexNumericFieldData;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.Before;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.UUID;

import static org.opensearch.index.compositeindex.CompositeIndexConstants.MAGIC_MARKER;
import static org.opensearch.index.mapper.CompositeMappedFieldType.CompositeFieldType.STAR_TREE;

public class StarTreeMetaTests extends OpenSearchTestCase {

private IndexOutput metaOut;
private IndexInput metaIn;
private StarTreeField starTreeField;
private SegmentWriteState writeState;
private Directory directory;
private FieldInfo[] fieldsInfo;
private List<Dimension> dimensionsOrder;
private List<String> fields = List.of();
private List<Metric> metrics;
private List<MetricAggregatorInfo> metricAggregatorInfos = new ArrayList<>();
private int segmentDocumentCount;
private long dataFilePointer;
private long dataFileLength;

@Before
public void setup() throws IOException {
fields = List.of("field1", "field2", "field3", "field4", "field5", "field6", "field7", "field8", "field9", "field10");
directory = newFSDirectory(createTempDir());
SegmentInfo segmentInfo = new SegmentInfo(
directory,
Version.LATEST,
Version.LUCENE_9_11_0,
"test_segment",
6,
false,
false,
new Lucene99Codec(),
new HashMap<>(),
UUID.randomUUID().toString().substring(0, 16).getBytes(StandardCharsets.UTF_8),
new HashMap<>(),
null
);

fieldsInfo = new FieldInfo[fields.size()];
for (int i = 0; i < fieldsInfo.length; i++) {
fieldsInfo[i] = new FieldInfo(
fields.get(i),
i,
false,
false,
true,
IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS,
DocValuesType.SORTED_NUMERIC,
-1,
Collections.emptyMap(),
0,
0,
0,
0,
VectorEncoding.FLOAT32,
VectorSimilarityFunction.EUCLIDEAN,
false,
false
);
}
FieldInfos fieldInfos = new FieldInfos(fieldsInfo);
writeState = new SegmentWriteState(InfoStream.getDefault(), segmentInfo.dir, segmentInfo, fieldInfos, null, newIOContext(random()));
}

public void test_starTreeMetadata() throws IOException {
dimensionsOrder = List.of(
new NumericDimension("field1"),
new NumericDimension("field3"),
new NumericDimension("field5"),
new NumericDimension("field8")
);
metrics = List.of(
new Metric("field2", List.of(MetricStat.SUM)),
new Metric("field4", List.of(MetricStat.SUM)),
new Metric("field6", List.of(MetricStat.COUNT)),
new Metric("field9", List.of(MetricStat.MIN)),
new Metric("field10", List.of(MetricStat.MAX))
);
int maxLeafDocs = randomInt();
StarTreeFieldConfiguration starTreeFieldConfiguration = new StarTreeFieldConfiguration(
maxLeafDocs,
new HashSet<>(),
StarTreeFieldConfiguration.StarTreeBuildMode.ON_HEAP
);
starTreeField = new StarTreeField("star_tree", dimensionsOrder, metrics, starTreeFieldConfiguration);

for (Metric metric : metrics) {
for (MetricStat metricType : metric.getMetrics()) {
MetricAggregatorInfo metricAggregatorInfo = new MetricAggregatorInfo(
metricType,
metric.getField(),
starTreeField.getName(),
IndexNumericFieldData.NumericType.DOUBLE
);
metricAggregatorInfos.add(metricAggregatorInfo);
}
}

dataFileLength = randomLong();
dataFilePointer = randomLong();
segmentDocumentCount = randomInt();
metaOut = directory.createOutput("star-tree-metadata", IOContext.DEFAULT);
StarTreeBuilderUtils.serializeStarTreeMetadata(
metaOut,
starTreeField,
writeState,
metricAggregatorInfos,
segmentDocumentCount,
dataFilePointer,
dataFileLength
);
metaOut.close();
metaIn = directory.openInput("star-tree-metadata", IOContext.READONCE);
assertEquals(MAGIC_MARKER, metaIn.readLong());

CompositeIndexMetadata compositeIndexMetadata = new CompositeIndexMetadata(metaIn);
assertEquals(starTreeField.getName(), compositeIndexMetadata.getCompositeFieldName());
assertEquals(STAR_TREE, compositeIndexMetadata.getCompositeFieldType());

StarTreeMetadata starTreeMetadata = compositeIndexMetadata.getStarTreeMetadata();
assertNotNull(starTreeMetadata);

for (int i = 0; i < dimensionsOrder.size(); i++) {
assertEquals(
writeState.fieldInfos.fieldInfo(dimensionsOrder.get(i).getField()).getFieldNumber(),
starTreeMetadata.getDimensionFieldNumbers().get(i),
0
);
}

for (int i = 0; i < metricAggregatorInfos.size(); i++) {
MetricEntry metricEntry = starTreeMetadata.getMetricEntries().get(i);
assertEquals(metricAggregatorInfos.get(i).getField(), metricEntry.getMetricName());
assertEquals(metricAggregatorInfos.get(i).getMetricStat(), metricEntry.getMetricStat());
}
assertEquals(segmentDocumentCount, starTreeMetadata.getSegmentAggregatedDocCount(), 0);
assertEquals(maxLeafDocs, starTreeMetadata.getMaxLeafDocs(), 0);
assertEquals(
starTreeFieldConfiguration.getSkipStarNodeCreationInDims().size(),
starTreeMetadata.getSkipStarNodeCreationInDims().size()
);
for (String skipStarNodeCreationInDims : starTreeField.getStarTreeConfig().getSkipStarNodeCreationInDims()) {
Integer skipStarNodeCreationInDimsFieldNumber = writeState.fieldInfos.fieldInfo(skipStarNodeCreationInDims).getFieldNumber();
assertTrue(starTreeMetadata.getSkipStarNodeCreationInDims().contains(skipStarNodeCreationInDimsFieldNumber));
}
assertEquals(starTreeFieldConfiguration.getBuildMode(), starTreeMetadata.getStarTreeBuildMode());
assertEquals(dataFileLength, starTreeMetadata.getDataLength());
assertEquals(dataFilePointer, starTreeMetadata.getDataStartFilePointer());

metaIn.close();

}

@Override
public void tearDown() throws Exception {
super.tearDown();
metaOut.close();
metaIn.close();
directory.close();
}

}
Loading

0 comments on commit 7ded5ca

Please sign in to comment.