Skip to content

Commit

Permalink
Star tree merge changes
Browse files Browse the repository at this point in the history
Signed-off-by: Bharathwaj G <[email protected]>
  • Loading branch information
bharath-techie authored and sarthakaggarwal97 committed Jul 18, 2024
1 parent 0689c64 commit 58f34ae
Show file tree
Hide file tree
Showing 29 changed files with 1,181 additions and 720 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.index.SortedSetDocValues;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.index.mapper.CompositeMappedFieldType;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
Expand Down Expand Up @@ -74,15 +74,13 @@ public void close() throws IOException {
}

@Override
public List<String> getCompositeIndexFields() {
public List<CompositeIndexFieldInfo> getCompositeIndexFields() {
// todo : read from file formats and get the field names.
throw new UnsupportedOperationException();

return new ArrayList<>();
}

@Override
public CompositeIndexValues getCompositeIndexValues(String field, CompositeMappedFieldType.CompositeFieldType fieldType)
throws IOException {
public CompositeIndexValues getCompositeIndexValues(CompositeIndexFieldInfo compositeIndexFieldInfo) throws IOException {
// TODO : read compositeIndexValues [starTreeValues] from star tree files
throw new UnsupportedOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,25 @@

package org.opensearch.index.codec.composite;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.DocValuesConsumer;
import org.apache.lucene.codecs.DocValuesProducer;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.MergeState;
import org.apache.lucene.index.SegmentWriteState;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.index.codec.composite.datacube.startree.StarTreeValues;
import org.opensearch.index.compositeindex.datacube.startree.StarTreeField;
import org.opensearch.index.compositeindex.datacube.startree.builder.StarTreesBuilder;
import org.opensearch.index.mapper.CompositeMappedFieldType;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.mapper.StarTreeMapper;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -42,6 +47,7 @@ public class Composite99DocValuesWriter extends DocValuesConsumer {
private final Set<String> compositeFieldSet;

private final Map<String, DocValuesProducer> fieldProducerMap = new HashMap<>();
private static final Logger logger = LogManager.getLogger(Composite99DocValuesWriter.class);

public Composite99DocValuesWriter(DocValuesConsumer delegate, SegmentWriteState segmentWriteState, MapperService mapperService) {

Expand Down Expand Up @@ -98,9 +104,9 @@ private void createCompositeIndicesIfPossible(DocValuesProducer valuesProducer,
// we have all the required fields to build composite fields
if (compositeFieldSet.isEmpty()) {
for (CompositeMappedFieldType mappedType : compositeMappedFieldTypes) {
if (mappedType instanceof StarTreeMapper.StarTreeFieldType) {
try (StarTreesBuilder starTreesBuilder = new StarTreesBuilder(fieldProducerMap, state, mapperService)) {
starTreesBuilder.build();
if (mappedType.getCompositeIndexType().equals(CompositeMappedFieldType.CompositeFieldType.STAR_TREE)) {
try (StarTreesBuilder starTreesBuilder = new StarTreesBuilder(state, mapperService)) {
starTreesBuilder.build(fieldProducerMap);
}
}
}
Expand All @@ -111,7 +117,58 @@ private void createCompositeIndicesIfPossible(DocValuesProducer valuesProducer,
public void merge(MergeState mergeState) throws IOException {
this.mergeState.compareAndSet(null, mergeState);
super.merge(mergeState);
// TODO : handle merge star tree
// mergeStarTreeFields(mergeState);
mergeCompositeFields(mergeState);
}

/**
* Merges composite fields from multiple segments
* @param mergeState merge state
*/
private void mergeCompositeFields(MergeState mergeState) throws IOException {
mergeStarTreeFields(mergeState);
}

/**
* Merges star tree data fields from multiple segments
* @param mergeState merge state
*/
private void mergeStarTreeFields(MergeState mergeState) throws IOException {
Map<String, List<StarTreeValues>> starTreeSubsPerField = new HashMap<>();
Map<String, StarTreeField> starTreeFieldMap = new HashMap<>();
for (int i = 0; i < mergeState.docValuesProducers.length; i++) {
CompositeIndexReader reader = null;
if (mergeState.docValuesProducers[i] == null) {
continue;
}
if (mergeState.docValuesProducers[i] instanceof CompositeIndexReader) {
reader = (CompositeIndexReader) mergeState.docValuesProducers[i];
} else {
continue;
}

List<CompositeIndexFieldInfo> compositeFieldInfo = reader.getCompositeIndexFields();
for (CompositeIndexFieldInfo fieldInfo : compositeFieldInfo) {
if (fieldInfo.getType().equals(CompositeMappedFieldType.CompositeFieldType.STAR_TREE)) {
CompositeIndexValues compositeIndexValues = reader.getCompositeIndexValues(fieldInfo);
if (compositeIndexValues instanceof StarTreeValues) {
List<StarTreeValues> fieldsList = starTreeSubsPerField.getOrDefault(fieldInfo.getField(), Collections.emptyList());

if (!starTreeFieldMap.containsKey(fieldInfo.getField())) {
starTreeFieldMap.put(fieldInfo.getField(), ((StarTreeValues) compositeIndexValues).getStarTreeField());
}
// assert star tree configuration is same across segments
else {
assert starTreeFieldMap.get(fieldInfo.getField())
.equals(((StarTreeValues) compositeIndexValues).getStarTreeField());
logger.error("Star tree configuration is not same for segments during merge");
}
fieldsList.add((StarTreeValues) compositeIndexValues);
starTreeSubsPerField.put(fieldInfo.getField(), fieldsList);
}
}
}
}
final StarTreesBuilder starTreesBuilder = new StarTreesBuilder(state, mapperService);
starTreesBuilder.buildDuringMerge(starTreeFieldMap, starTreeSubsPerField);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@

/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.codec.composite;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.index.mapper.CompositeMappedFieldType;

/**
* Field info details of composite index fields
*
* @opensearch.experimental
*/
@ExperimentalApi
public class CompositeIndexFieldInfo {
private final String field;
private final CompositeMappedFieldType.CompositeFieldType type;

public CompositeIndexFieldInfo(String field, CompositeMappedFieldType.CompositeFieldType type) {
this.field = field;
this.type = type;
}

public String getField() {
return field;
}

public CompositeMappedFieldType.CompositeFieldType getType() {
return type;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.opensearch.index.codec.composite;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.index.mapper.CompositeMappedFieldType;

import java.io.IOException;
import java.util.List;
Expand All @@ -25,10 +24,10 @@ public interface CompositeIndexReader {
* Get list of composite index fields from the segment
*
*/
List<String> getCompositeIndexFields();
List<CompositeIndexFieldInfo> getCompositeIndexFields();

/**
* Get composite index values based on the field name and the field type
*/
CompositeIndexValues getCompositeIndexValues(String field, CompositeMappedFieldType.CompositeFieldType fieldType) throws IOException;
CompositeIndexValues getCompositeIndexValues(CompositeIndexFieldInfo fieldInfo) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@

package org.opensearch.index.codec.composite.datacube.startree;

import org.apache.lucene.search.DocIdSetIterator;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.index.codec.composite.CompositeIndexValues;
import org.opensearch.index.compositeindex.datacube.startree.StarTreeField;
import org.opensearch.index.compositeindex.datacube.startree.node.StarTreeNode;

import java.util.List;
import java.util.Map;

/**
* Concrete class that holds the star tree associated values from the segment
Expand All @@ -20,16 +23,41 @@
*/
@ExperimentalApi
public class StarTreeValues implements CompositeIndexValues {
private final List<String> dimensionsOrder;
private final StarTreeField starTreeField;
private final StarTreeNode root;
private final Map<String, DocIdSetIterator> dimensionDocValuesIteratorMap;
private final Map<String, DocIdSetIterator> metricDocValuesIteratorMap;

// TODO : come up with full set of vales such as dimensions and metrics doc values + star tree
public StarTreeValues(List<String> dimensionsOrder) {
super();
this.dimensionsOrder = List.copyOf(dimensionsOrder);
public StarTreeValues(
StarTreeField starTreeField,
StarTreeNode root,
Map<String, DocIdSetIterator> dimensionDocValuesIteratorMap,
Map<String, DocIdSetIterator> metricDocValuesIteratorMap
) {
this.starTreeField = starTreeField;
this.root = root;
this.dimensionDocValuesIteratorMap = dimensionDocValuesIteratorMap;
this.metricDocValuesIteratorMap = metricDocValuesIteratorMap;
}

@Override
public CompositeIndexValues getValues() {
return this;
}

public StarTreeField getStarTreeField() {
return starTreeField;
}

public StarTreeNode getRoot() {
return root;
}

public Map<String, DocIdSetIterator> getDimensionDocValuesIteratorMap() {
return dimensionDocValuesIteratorMap;
}

public Map<String, DocIdSetIterator> getMetricDocValuesIteratorMap() {
return metricDocValuesIteratorMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public Long toLongValue(Long value) {
}

@Override
public Long toStarTreeNumericTypeValue(Long value, StarTreeNumericType type) {
public Long toStarTreeNumericTypeValue(Long value) {
return value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,13 @@

import org.opensearch.index.compositeindex.datacube.MetricStat;
import org.opensearch.index.compositeindex.datacube.startree.aggregators.numerictype.StarTreeNumericType;
import org.opensearch.index.compositeindex.datacube.startree.utils.SequentialDocValuesIterator;
import org.opensearch.index.fielddata.IndexNumericFieldData;

import java.util.Comparator;
import java.util.Objects;

/**
* Builds aggregation function and doc values field pair to support various aggregations
*
* @opensearch.experimental
*/
public class MetricAggregatorInfo implements Comparable<MetricAggregatorInfo> {
Expand All @@ -29,22 +27,14 @@ public class MetricAggregatorInfo implements Comparable<MetricAggregatorInfo> {
private final String field;
private final ValueAggregator valueAggregators;
private final StarTreeNumericType starTreeNumericType;
private final SequentialDocValuesIterator metricStatReader;

/**
* Constructor for MetricAggregatorInfo
*/
public MetricAggregatorInfo(
MetricStat metricStat,
String field,
String starFieldName,
IndexNumericFieldData.NumericType numericType,
SequentialDocValuesIterator metricStatReader
) {
public MetricAggregatorInfo(MetricStat metricStat, String field, String starFieldName, IndexNumericFieldData.NumericType numericType) {
this.metricStat = metricStat;
this.valueAggregators = ValueAggregatorFactory.getValueAggregator(metricStat);
this.starTreeNumericType = StarTreeNumericType.fromNumericType(numericType);
this.metricStatReader = metricStatReader;
this.field = field;
this.starFieldName = starFieldName;
this.metric = toFieldName();
Expand Down Expand Up @@ -85,13 +75,6 @@ public StarTreeNumericType getAggregatedValueType() {
return starTreeNumericType;
}

/**
* @return metric value reader iterator
*/
public SequentialDocValuesIterator getMetricStatReader() {
return metricStatReader;
}

/**
* @return field name with metric type and field
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ public Long toLongValue(Double value) {
}

@Override
public Double toStarTreeNumericTypeValue(Long value, StarTreeNumericType type) {
public Double toStarTreeNumericTypeValue(Long value) {
try {
return type.getDoubleValue(value);
return VALUE_AGGREGATOR_TYPE.getDoubleValue(value);
} catch (Exception e) {
throw new IllegalStateException("Cannot convert " + value + " to sortable aggregation type", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,5 @@ public interface ValueAggregator<A> {
/**
* Converts an aggregated value from a Long type.
*/
A toStarTreeNumericTypeValue(Long rawValue, StarTreeNumericType type);
A toStarTreeNumericTypeValue(Long rawValue);
}
Loading

0 comments on commit 58f34ae

Please sign in to comment.