Skip to content

Commit

Permalink
[Star tree] Performance optimizations during flush flow (#16037)
Browse files Browse the repository at this point in the history
---------

Signed-off-by: Bharathwaj G <[email protected]>
  • Loading branch information
bharath-techie authored Oct 17, 2024
1 parent 0c3e3c0 commit dc8a435
Show file tree
Hide file tree
Showing 7 changed files with 184 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@

package org.opensearch.common.util;

import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.RandomAccessInput;

import java.io.IOException;
import java.nio.ByteBuffer;

/**
* A bitset backed by a byte array. This will initialize and set bits in the byte array based on the index.
Expand All @@ -39,18 +38,6 @@ public ByteArrayBackedBitset(RandomAccessInput in, long offset, int length) thro
}
}

/**
* Constructor which set the Lucene's IndexInput to read the bitset into a read-only buffer.
*/
public ByteArrayBackedBitset(IndexInput in, int length) throws IOException {
byteArray = new byte[length];
int i = 0;
while (i < length) {
byteArray[i] = in.readByte();
i++;
}
}

/**
* Sets the bit at the given index to 1.
* Each byte can indicate 8 bits, so the index is divided by 8 to get the byte array index.
Expand All @@ -61,10 +48,10 @@ public void set(int index) {
byteArray[byteArrIndex] |= (byte) (1 << (index & 7));
}

public int write(IndexOutput output) throws IOException {
public int write(ByteBuffer output) throws IOException {
int numBytes = 0;
for (Byte bitSet : byteArray) {
output.writeByte(bitSet);
output.put(bitSet);
numBytes += Byte.BYTES;
}
return numBytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import org.apache.lucene.store.TrackingDirectoryWrapper;
import org.apache.lucene.util.NumericUtils;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.index.compositeindex.datacube.Metric;
import org.opensearch.index.compositeindex.datacube.MetricStat;
import org.opensearch.index.compositeindex.datacube.startree.StarTreeDocument;
import org.opensearch.index.compositeindex.datacube.startree.StarTreeField;
import org.opensearch.index.compositeindex.datacube.startree.aggregators.MetricAggregatorInfo;
Expand All @@ -24,6 +26,8 @@

import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.List;

import static org.opensearch.index.mapper.NumberFieldMapper.NumberType.DOUBLE;
Expand Down Expand Up @@ -67,54 +71,84 @@ private void setDocSizeInBytes(int numBytes) {
}

/**
* Write the star tree document to file associated with dimensions and metrics
* Write the star tree document to a byte buffer
*/
protected int writeStarTreeDocument(StarTreeDocument starTreeDocument, IndexOutput output, boolean isAggregatedDoc) throws IOException {
int numBytes = writeDimensions(starTreeDocument, output);
numBytes += writeMetrics(starTreeDocument, output, isAggregatedDoc);
int numBytes = calculateDocumentSize(starTreeDocument, isAggregatedDoc);
byte[] bytes = new byte[numBytes];
ByteBuffer buffer = ByteBuffer.wrap(bytes).order(ByteOrder.nativeOrder());
writeDimensions(starTreeDocument, buffer);
if (isAggregatedDoc == false) {
writeFlushMetrics(starTreeDocument, buffer);
} else {
writeMetrics(starTreeDocument, buffer, isAggregatedDoc);
}
output.writeBytes(bytes, bytes.length);
setDocSizeInBytes(numBytes);
return numBytes;
return bytes.length;
}

/**
* Write dimensions to file
* Write dimensions to the byte buffer
*/
protected int writeDimensions(StarTreeDocument starTreeDocument, IndexOutput output) throws IOException {
int numBytes = 0;
for (int i = 0; i < starTreeDocument.dimensions.length; i++) {
output.writeLong(starTreeDocument.dimensions[i] == null ? 0L : starTreeDocument.dimensions[i]);
numBytes += Long.BYTES;
protected void writeDimensions(StarTreeDocument starTreeDocument, ByteBuffer buffer) throws IOException {
for (Long dimension : starTreeDocument.dimensions) {
buffer.putLong(dimension == null ? 0L : dimension);
}
numBytes += StarTreeDocumentBitSetUtil.writeBitSet(starTreeDocument.dimensions, output);
return numBytes;
StarTreeDocumentBitSetUtil.writeBitSet(starTreeDocument.dimensions, buffer);
}

/**
* Write star tree document metrics to file
*/
protected int writeMetrics(StarTreeDocument starTreeDocument, IndexOutput output, boolean isAggregatedDoc) throws IOException {
int numBytes = 0;
protected void writeFlushMetrics(StarTreeDocument starTreeDocument, ByteBuffer buffer) throws IOException {
for (int i = 0; i < starTreeDocument.metrics.length; i++) {
buffer.putLong(starTreeDocument.metrics[i] == null ? 0L : (Long) starTreeDocument.metrics[i]);
}
StarTreeDocumentBitSetUtil.writeBitSet(starTreeDocument.metrics, buffer);
}

/**
* Write star tree document metrics to the byte buffer
*/
protected void writeMetrics(StarTreeDocument starTreeDocument, ByteBuffer buffer, boolean isAggregatedDoc) throws IOException {
for (int i = 0; i < starTreeDocument.metrics.length; i++) {
FieldValueConverter aggregatedValueType = metricAggregatorInfos.get(i).getValueAggregators().getAggregatedValueType();
if (aggregatedValueType.equals(LONG)) {
output.writeLong(starTreeDocument.metrics[i] == null ? 0L : (Long) starTreeDocument.metrics[i]);
numBytes += Long.BYTES;
buffer.putLong(starTreeDocument.metrics[i] == null ? 0L : (Long) starTreeDocument.metrics[i]);
} else if (aggregatedValueType.equals(DOUBLE)) {
if (isAggregatedDoc) {
long val = NumericUtils.doubleToSortableLong(
starTreeDocument.metrics[i] == null ? 0.0 : (Double) starTreeDocument.metrics[i]
);
output.writeLong(val);
buffer.putLong(val);
} else {
output.writeLong(starTreeDocument.metrics[i] == null ? 0L : (Long) starTreeDocument.metrics[i]);
buffer.putLong(starTreeDocument.metrics[i] == null ? 0L : (Long) starTreeDocument.metrics[i]);
}
numBytes += Long.BYTES;
} else {
throw new IllegalStateException("Unsupported metric type");
}
}
numBytes += StarTreeDocumentBitSetUtil.writeBitSet(starTreeDocument.metrics, output);
return numBytes;
StarTreeDocumentBitSetUtil.writeBitSet(starTreeDocument.metrics, buffer);
}

/**
* Calculate the size of the serialized StarTreeDocument
*/
private int calculateDocumentSize(StarTreeDocument starTreeDocument, boolean isAggregatedDoc) {
int size = starTreeDocument.dimensions.length * Long.BYTES;
size += getLength(starTreeDocument.dimensions);

for (int i = 0; i < starTreeDocument.metrics.length; i++) {
size += Long.BYTES;
}
size += getLength(starTreeDocument.metrics);

return size;
}

private static int getLength(Object[] array) {
return (array.length / 8) + (array.length % 8 == 0 ? 0 : 1);
}

/**
Expand All @@ -132,7 +166,11 @@ protected StarTreeDocument readStarTreeDocument(RandomAccessInput input, long of
offset = readDimensions(dimensions, input, offset);

Object[] metrics = new Object[numMetrics];
offset = readMetrics(input, offset, numMetrics, metrics, isAggregatedDoc);
if (isAggregatedDoc == false) {
offset = readMetrics(input, offset, metrics);
} else {
offset = readMetrics(input, offset, numMetrics, metrics, isAggregatedDoc);
}
assert (offset - initialOffset) == docSizeInBytes;
return new StarTreeDocument(dimensions, metrics);
}
Expand All @@ -154,10 +192,32 @@ protected long readDimensions(Long[] dimensions, RandomAccessInput input, long o
return offset;
}

/**
* Read metrics based on metric field values. Then we reuse the metric field values to each of the metric stats.
*/
private long readMetrics(RandomAccessInput input, long offset, Object[] metrics) throws IOException {
Object[] fieldMetrics = new Object[starTreeField.getMetrics().size()];
for (int i = 0; i < starTreeField.getMetrics().size(); i++) {
fieldMetrics[i] = input.readLong(offset);
offset += Long.BYTES;
}
offset += StarTreeDocumentBitSetUtil.readBitSet(input, offset, fieldMetrics, index -> null);
int fieldIndex = 0;
int numMetrics = 0;
for (Metric metric : starTreeField.getMetrics()) {
for (MetricStat stat : metric.getBaseMetrics()) {
metrics[numMetrics] = fieldMetrics[fieldIndex];
numMetrics++;
}
fieldIndex++;
}
return offset;
}

/**
* Read star tree metrics from file
*/
protected long readMetrics(RandomAccessInput input, long offset, int numMetrics, Object[] metrics, boolean isAggregatedDoc)
private long readMetrics(RandomAccessInput input, long offset, int numMetrics, Object[] metrics, boolean isAggregatedDoc)
throws IOException {
for (int i = 0; i < numMetrics; i++) {
FieldValueConverter aggregatedValueType = metricAggregatorInfos.get(i).getValueAggregators().getAggregatedValueType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ public List<MetricAggregatorInfo> generateMetricAggregatorInfos(MapperService ma

/**
* Generates the configuration required to perform aggregation for all the metrics on a field
* Each metric field is associated with a metric reader
*
* @return list of MetricAggregatorInfo
*/
Expand All @@ -191,24 +192,20 @@ public List<SequentialDocValuesIterator> getMetricReaders(SegmentWriteState stat

List<SequentialDocValuesIterator> metricReaders = new ArrayList<>();
for (Metric metric : this.starTreeField.getMetrics()) {
for (MetricStat metricStat : metric.getBaseMetrics()) {
SequentialDocValuesIterator metricReader;
FieldInfo metricFieldInfo = state.fieldInfos.fieldInfo(metric.getField());
if (metricStat.equals(MetricStat.DOC_COUNT)) {
// _doc_count is numeric field , so we convert to sortedNumericDocValues and get iterator
metricReader = getIteratorForNumericField(fieldProducerMap, metricFieldInfo, DocCountFieldMapper.NAME);
} else {
if (metricFieldInfo == null) {
metricFieldInfo = getFieldInfo(metric.getField(), DocValuesType.SORTED_NUMERIC);
}
metricReader = new SequentialDocValuesIterator(
new SortedNumericStarTreeValuesIterator(
fieldProducerMap.get(metricFieldInfo.name).getSortedNumeric(metricFieldInfo)
)
);
SequentialDocValuesIterator metricReader;
FieldInfo metricFieldInfo = state.fieldInfos.fieldInfo(metric.getField());
if (metric.getField().equals(DocCountFieldMapper.NAME)) {
metricReader = getIteratorForNumericField(fieldProducerMap, metricFieldInfo, DocCountFieldMapper.NAME);
} else {
if (metric.getBaseMetrics().isEmpty()) continue;
if (metricFieldInfo == null) {
metricFieldInfo = getFieldInfo(metric.getField(), DocValuesType.SORTED_NUMERIC);
}
metricReaders.add(metricReader);
metricReader = new SequentialDocValuesIterator(
new SortedNumericStarTreeValuesIterator(fieldProducerMap.get(metricFieldInfo.name).getSortedNumeric(metricFieldInfo))
);
}
metricReaders.add(metricReader);
}
return metricReaders;
}
Expand Down Expand Up @@ -572,21 +569,29 @@ Long[] getStarTreeDimensionsFromSegment(int currentDocId, SequentialDocValuesIte
*/
private Object[] getStarTreeMetricsFromSegment(int currentDocId, List<SequentialDocValuesIterator> metricsReaders) throws IOException {
Object[] metrics = new Object[numMetrics];
for (int i = 0; i < numMetrics; i++) {
SequentialDocValuesIterator metricStatReader = metricsReaders.get(i);
if (metricStatReader != null) {
int metricIndex = 0;
for (int i = 0; i < starTreeField.getMetrics().size(); i++) {
Metric metric = starTreeField.getMetrics().get(i);
if (metric.getBaseMetrics().isEmpty()) continue;
SequentialDocValuesIterator metricReader = metricsReaders.get(i);
if (metricReader != null) {
try {
metricStatReader.nextEntry(currentDocId);
metricReader.nextEntry(currentDocId);
Object metricValue = metricReader.value(currentDocId);

for (MetricStat metricStat : metric.getBaseMetrics()) {
metrics[metricIndex] = metricValue;
metricIndex++;
}
} catch (IOException e) {
logger.error("unable to iterate to next doc", e);
throw new RuntimeException("unable to iterate to next doc", e);
} catch (Exception e) {
logger.error("unable to read the metric values from the segment", e);
throw new IllegalStateException("unable to read the metric values from the segment", e);
}
metrics[i] = metricStatReader.value(currentDocId);
} else {
throw new IllegalStateException("metric readers are empty");
throw new IllegalStateException("metric reader is empty for metric: " + metric.getField());
}
}
return metrics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public Iterator<StarTreeDocument> sortAndAggregateSegmentDocuments(
}
try {
for (int i = 0; i < totalSegmentDocs; i++) {
StarTreeDocument document = getSegmentStarTreeDocument(i, dimensionReaders, metricReaders);
StarTreeDocument document = getSegmentStarTreeDocumentWithMetricFieldValues(i, dimensionReaders, metricReaders);
segmentDocumentFileManager.writeStarTreeDocument(document, false);
}
} catch (IOException ex) {
Expand All @@ -128,6 +128,45 @@ public Iterator<StarTreeDocument> sortAndAggregateSegmentDocuments(
return sortAndReduceDocuments(sortedDocIds, totalSegmentDocs, false);
}

/**
* Returns the star-tree document from the segment based on the current doc id
*/
StarTreeDocument getSegmentStarTreeDocumentWithMetricFieldValues(
int currentDocId,
SequentialDocValuesIterator[] dimensionReaders,
List<SequentialDocValuesIterator> metricReaders
) throws IOException {
Long[] dimensions = getStarTreeDimensionsFromSegment(currentDocId, dimensionReaders);
Object[] metricValues = getStarTreeMetricFieldValuesFromSegment(currentDocId, metricReaders);
return new StarTreeDocument(dimensions, metricValues);
}

/**
* Returns the metric field values for the star-tree document from the segment based on the current doc id
*/
private Object[] getStarTreeMetricFieldValuesFromSegment(int currentDocId, List<SequentialDocValuesIterator> metricReaders) {
Object[] metricValues = new Object[starTreeField.getMetrics().size()];
for (int i = 0; i < starTreeField.getMetrics().size(); i++) {
if (starTreeField.getMetrics().get(i).getBaseMetrics().isEmpty()) continue;
SequentialDocValuesIterator metricReader = metricReaders.get(i);
if (metricReader != null) {
try {
metricReader.nextEntry(currentDocId);
metricValues[i] = metricReader.value(currentDocId);
} catch (IOException e) {
logger.error("unable to iterate to next doc", e);
throw new RuntimeException("unable to iterate to next doc", e);
} catch (Exception e) {
logger.error("unable to read the metric values from the segment", e);
throw new IllegalStateException("unable to read the metric values from the segment", e);
}
} else {
throw new IllegalStateException("metric reader is empty");
}
}
return metricValues;
}

/**
* Sorts and aggregates the star-tree documents from multiple segments and builds star tree based on the newly
* aggregated star-tree documents
Expand Down
Loading

0 comments on commit dc8a435

Please sign in to comment.