Skip to content

Commit

Permalink
star-tree file formats reader and javadoc fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
sarthakaggarwal97 committed Jul 8, 2024
1 parent 114a3ee commit 206f66f
Show file tree
Hide file tree
Showing 35 changed files with 695 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.DocValuesProducer;
import org.apache.lucene.store.IndexOutput;
import org.opensearch.index.compositeindex.datacube.Dimension;
import org.opensearch.index.compositeindex.datacube.Metric;
import org.opensearch.index.compositeindex.datacube.MetricStat;
Expand Down Expand Up @@ -67,29 +68,41 @@ public abstract class BaseStarTreeBuilder implements StarTreeBuilder {
private final StarTreeField starTreeField;

private final MapperService mapperService;
private final SegmentWriteState state;
private final SegmentWriteState writeState;

private final IndexOutput metaOut;
private final IndexOutput dataOut;

/**
* Builds star tree based on star tree field configuration consisting of dimensions, metrics and star tree index specific configuration.
*
* @param starTreeField holds the configuration for the star tree
* @param state stores the segment write state
* @param mapperService helps to find the original type of the field
* @param starTreeField holds the configuration for the star tree
* @param writeState stores the segment write writeState
* @param mapperService helps to find the original type of the field
*/
protected BaseStarTreeBuilder(StarTreeField starTreeField, SegmentWriteState state, MapperService mapperService) throws IOException {
protected BaseStarTreeBuilder(
IndexOutput metaOut,
IndexOutput dataOut,
StarTreeField starTreeField,
SegmentWriteState writeState,
MapperService mapperService
) throws IOException {

logger.debug("Building star tree : {}", starTreeField);

this.metaOut = metaOut;
this.dataOut = dataOut;

this.starTreeField = starTreeField;
StarTreeFieldConfiguration starTreeFieldSpec = starTreeField.getStarTreeConfig();

List<Dimension> dimensionsSplitOrder = starTreeField.getDimensionsOrder();
this.numDimensions = dimensionsSplitOrder.size();

this.skipStarNodeCreationForDimensions = new HashSet<>();
this.totalSegmentDocs = state.segmentInfo.maxDoc();
this.totalSegmentDocs = writeState.segmentInfo.maxDoc();
this.mapperService = mapperService;
this.state = state;
this.writeState = writeState;

Set<String> skipStarNodeCreationForDimensions = starTreeFieldSpec.getSkipStarNodeCreationInDims();

Expand Down Expand Up @@ -142,19 +155,20 @@ public List<MetricAggregatorInfo> generateMetricAggregatorInfos(MapperService ma
*/
public List<SequentialDocValuesIterator> getMetricReaders(SegmentWriteState state, Map<String, DocValuesProducer> fieldProducerMap)
throws IOException {

List<SequentialDocValuesIterator> metricReaders = new ArrayList<>();
for (Metric metric : this.starTreeField.getMetrics()) {
for (MetricStat metricType : metric.getMetrics()) {
SequentialDocValuesIterator metricReader = null;

SequentialDocValuesIterator metricReader;
FieldInfo metricFieldInfo = state.fieldInfos.fieldInfo(metric.getField());
// TODO
// if (metricType != MetricStat.COUNT) {
// Need not initialize the metric reader for COUNT metric type
metricReader = new SequentialDocValuesIterator(
fieldProducerMap.get(metricFieldInfo.name).getSortedNumeric(metricFieldInfo)
);
// }
if (metricType != MetricStat.COUNT) {
// Need not initialize the metric reader with relevant doc id set iterator for COUNT metric type
metricReader = new SequentialDocValuesIterator(
fieldProducerMap.get(metricFieldInfo.name).getSortedNumeric(metricFieldInfo)
);
} else {
metricReader = new SequentialDocValuesIterator();
}

metricReaders.add(metricReader);
}
Expand All @@ -166,19 +180,18 @@ public List<SequentialDocValuesIterator> getMetricReaders(SegmentWriteState stat
* Builds the star tree from the original segment documents
*
* @param fieldProducerMap contain s the docValues producer to get docValues associated with each field
*
* @throws IOException when we are unable to build star-tree
*/
public void build(Map<String, DocValuesProducer> fieldProducerMap) throws IOException {
long startTime = System.currentTimeMillis();
logger.debug("Star-tree build is a go with star tree field {}", starTreeField.getName());

List<SequentialDocValuesIterator> metricReaders = getMetricReaders(state, fieldProducerMap);
List<SequentialDocValuesIterator> metricReaders = getMetricReaders(writeState, fieldProducerMap);
List<Dimension> dimensionsSplitOrder = starTreeField.getDimensionsOrder();
SequentialDocValuesIterator[] dimensionReaders = new SequentialDocValuesIterator[dimensionsSplitOrder.size()];
for (int i = 0; i < numDimensions; i++) {
String dimension = dimensionsSplitOrder.get(i).getField();
FieldInfo dimensionFieldInfo = state.fieldInfos.fieldInfo(dimension);
FieldInfo dimensionFieldInfo = writeState.fieldInfos.fieldInfo(dimension);
dimensionReaders[i] = new SequentialDocValuesIterator(
fieldProducerMap.get(dimensionFieldInfo.name).getSortedNumeric(dimensionFieldInfo)
);
Expand Down Expand Up @@ -209,8 +222,8 @@ public void build(Iterator<StarTreeDocument> starTreeDocumentIterator) throws IO
logger.debug("Generated star tree docs : [{}] from segment docs : [{}]", numStarTreeDocument, numSegmentStarTreeDocument);

if (numStarTreeDocs == 0) {
// TODO: Uncomment when segment codec is ready
// StarTreeBuilderUtils.serializeTree(indexOutput, rootNode, dimensionsSplitOrder, numNodes);
// serialize the star tree data
serializeStarTree(numSegmentStarTreeDocument);
return;
}

Expand All @@ -228,10 +241,29 @@ public void build(Iterator<StarTreeDocument> starTreeDocumentIterator) throws IO

// TODO: When StarTree Codec is ready
// Create doc values indices in disk
// Serialize and save in disk

serializeStarTree(numSegmentStarTreeDocument);

// Write star tree metadata for off heap implementation
}

private void serializeStarTree(int numSegmentStarTreeDocument) throws IOException {
// serialize the star tree data
long dataFilePointer = dataOut.getFilePointer();
long totalStarTreeDataLength = StarTreeBuilderUtils.serializeStarTree(dataOut, rootNode, numStarTreeNodes);

// serialize the star tree meta
StarTreeBuilderUtils.serializeStarTreeMetadata(
metaOut,
starTreeField,
writeState,
metricAggregatorInfos,
numSegmentStarTreeDocument,
dataFilePointer,
totalStarTreeDataLength
);
}

/**
* Adds a document to the star-tree.
*
Expand Down Expand Up @@ -269,9 +301,9 @@ public void build(Iterator<StarTreeDocument> starTreeDocumentIterator) throws IO
* Sorts and aggregates all the documents in the segment as per the configuration, and returns a star-tree document iterator for all the
* aggregated star-tree documents.
*
* @param numDocs number of documents in the given segment
* @param numDocs number of documents in the given segment
* @param dimensionReaders List of docValues readers to read dimensions from the segment
* @param metricReaders List of docValues readers to read metrics from the segment
* @param metricReaders List of docValues readers to read metrics from the segment
* @return Iterator for the aggregated star-tree document
*/
public abstract Iterator<StarTreeDocument> sortAndAggregateSegmentDocuments(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,30 @@ public class Composite90DocValuesFormat extends DocValuesFormat {
private final DocValuesFormat delegate;
private final MapperService mapperService;

/** Data codec name for Composite Doc Values Format */
public static final String DATA_CODEC_NAME = "Composite90FormatData";

/** Meta codec name for Composite Doc Values Format */
public static final String META_CODEC_NAME = "Composite90FormatMeta";

/** Filename extension for the composite index data */
public static final String DATA_EXTENSION = "sttd";

/** Filename extension for the composite index meta */
public static final String META_EXTENSION = "sttm";

/** Filename extension for the composite index data doc values */
public static final String DATA_DOC_VALUES_EXTENSION = "sttddvm";

/** Filename extension for the composite index meta doc values */
public static final String META_DOC_VALUES_EXTENSION = "sttmdvm";

/** Initial version for the Composite90DocValuesFormat */
public static final int VERSION_START = 0;

/** Current version for the Composite90DocValuesFormat */
public static final int VERSION_CURRENT = VERSION_START;

// needed for SPI
public Composite90DocValuesFormat() {
this(new Lucene90DocValuesFormat(), null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,32 @@

package org.opensearch.index.codec.composite;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.codecs.DocValuesProducer;
import org.apache.lucene.index.BinaryDocValues;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.SegmentReadState;
import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.store.ChecksumIndexInput;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.util.IOUtils;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.index.mapper.CompositeMappedFieldType;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.compositeindex.CompositeIndexMetadata;
import org.opensearch.index.compositeindex.datacube.startree.node.OffHeapStarTree;
import org.opensearch.index.compositeindex.datacube.startree.node.StarTree;

import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Set;
import java.util.Map;

/**
* Reader for star tree index and star tree doc values from the segments
Expand All @@ -32,11 +42,95 @@
*/
@ExperimentalApi
public class Composite90DocValuesReader extends DocValuesProducer implements CompositeIndexReader {
private DocValuesProducer delegate;
private static final Logger logger = LogManager.getLogger(CompositeIndexMetadata.class);

public Composite90DocValuesReader(DocValuesProducer producer, SegmentReadState state) throws IOException {
private final DocValuesProducer delegate;
private final IndexInput dataIn;
private final ChecksumIndexInput metaIn;
private final Map<String, StarTree> starTreeMap = new LinkedHashMap<>();
private final Map<String, CompositeIndexMetadata> starTreeMetaMap = new LinkedHashMap<>();
private final List<CompositeIndexFieldInfo> compositeFieldInfos = new ArrayList<>();

public Composite90DocValuesReader(DocValuesProducer producer, SegmentReadState readState) throws IOException {
this.delegate = producer;
// TODO : read star tree files

String metaFileName = IndexFileNames.segmentFileName(
readState.segmentInfo.name,
readState.segmentSuffix,
Composite90DocValuesFormat.META_EXTENSION
);

String dataFileName = IndexFileNames.segmentFileName(
readState.segmentInfo.name,
readState.segmentSuffix,
Composite90DocValuesFormat.DATA_EXTENSION
);

boolean success = false;
try {

dataIn = readState.directory.openInput(dataFileName, readState.context);
CodecUtil.checkIndexHeader(
dataIn,
Composite90DocValuesFormat.DATA_CODEC_NAME,
Composite90DocValuesFormat.VERSION_START,
Composite90DocValuesFormat.VERSION_CURRENT,
readState.segmentInfo.getId(),
readState.segmentSuffix
);
CodecUtil.retrieveChecksum(dataIn);

metaIn = readState.directory.openChecksumInput(metaFileName, readState.context);
Throwable priorE = null;
try {
CodecUtil.checkIndexHeader(
metaIn,
Composite90DocValuesFormat.META_CODEC_NAME,
Composite90DocValuesFormat.VERSION_START,
Composite90DocValuesFormat.VERSION_CURRENT,
readState.segmentInfo.getId(),
readState.segmentSuffix
);

while (true) {
long magicMarker = metaIn.readLong();

if (magicMarker == -1) {
logger.info("EOF reached for composite index metadata");
return;
} else if (magicMarker < 0) {
throw new CorruptIndexException("Unknown token encountered: " + magicMarker, metaIn);
}
CompositeIndexMetadata compositeIndexMetadata = new CompositeIndexMetadata(metaIn, magicMarker);
compositeFieldInfos.add(
new CompositeIndexFieldInfo(
compositeIndexMetadata.getCompositeFieldName(),
compositeIndexMetadata.getCompositeFieldType()
)
);
switch (compositeIndexMetadata.getCompositeFieldType()) {
case STAR_TREE:
StarTree starTree = new OffHeapStarTree(dataIn, compositeIndexMetadata.getStarTreeMetadata());
starTreeMap.put(compositeIndexMetadata.getCompositeFieldName(), starTree);
starTreeMetaMap.put(compositeIndexMetadata.getCompositeFieldName(), compositeIndexMetadata);
break;
default:
throw new CorruptIndexException("Invalid composite field type found in the file", dataIn);
}
}
} catch (Throwable t) {
priorE = t;
} finally {
CodecUtil.checkFooter(metaIn, priorE);
}
CodecUtil.retrieveChecksum(dataIn);
success = true;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(this);
}
}

}

@Override
Expand Down Expand Up @@ -67,25 +161,27 @@ public SortedSetDocValues getSortedSet(FieldInfo field) throws IOException {
@Override
public void checkIntegrity() throws IOException {
delegate.checkIntegrity();
// Todo : check integrity of composite index related [star tree] files
CodecUtil.checksumEntireFile(metaIn);
CodecUtil.checksumEntireFile(dataIn);
}

@Override
public void close() throws IOException {
delegate.close();
// Todo: close composite index related files [star tree] files
starTreeMap.clear();
starTreeMetaMap.clear();
}

@Override
public List<CompositeIndexFieldInfo> getCompositeIndexFields() {
// todo : read from file formats and get the field names.
return new ArrayList<>();
return compositeFieldInfos;

}

@Override
public CompositeIndexValues getCompositeIndexValues(CompositeIndexFieldInfo compositeIndexFieldInfo) throws IOException {
// TODO : read compositeIndexValues [starTreeValues] from star tree files

throw new UnsupportedOperationException();
}
}
Loading

0 comments on commit 206f66f

Please sign in to comment.