Skip to content

Commit

Permalink
Star tree merge changes
Browse files Browse the repository at this point in the history
Signed-off-by: Bharathwaj G <[email protected]>
  • Loading branch information
bharath-techie authored and sarthakaggarwal97 committed Jul 5, 2024
1 parent 4232692 commit c6a78b5
Show file tree
Hide file tree
Showing 19 changed files with 774 additions and 347 deletions.
259 changes: 136 additions & 123 deletions server/src/main/java/org/apache/lucene/index/BaseStarTreeBuilder.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.opensearch.index.mapper.MapperService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;

Expand Down Expand Up @@ -76,15 +77,14 @@ public void close() throws IOException {
}

@Override
public List<String> getCompositeIndexFields() {
public List<CompositeIndexFieldInfo> getCompositeIndexFields() {
// todo : read from file formats and get the field names.
throw new UnsupportedOperationException();
return new ArrayList<>();

}

@Override
public CompositeIndexValues getCompositeIndexValues(String field, CompositeMappedFieldType.CompositeFieldType fieldType)
throws IOException {
public CompositeIndexValues getCompositeIndexValues(CompositeIndexFieldInfo compositeIndexFieldInfo) throws IOException {
// TODO : read compositeIndexValues [starTreeValues] from star tree files
throw new UnsupportedOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,25 @@

package org.opensearch.index.codec.composite;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.DocValuesConsumer;
import org.apache.lucene.codecs.DocValuesProducer;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.MergeState;
import org.apache.lucene.index.SegmentWriteState;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.index.codec.composite.datacube.startree.StarTreeValues;
import org.opensearch.index.compositeindex.datacube.startree.StarTreeField;
import org.opensearch.index.compositeindex.datacube.startree.builder.StarTreesBuilder;
import org.opensearch.index.mapper.CompositeMappedFieldType;
import org.opensearch.index.mapper.MapperService;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

Expand All @@ -39,6 +46,7 @@ public class Composite90DocValuesWriter extends DocValuesConsumer {
private final Set<String> compositeFieldSet;

private final Map<String, DocValuesProducer> fieldProducerMap = new HashMap<>();
private static final Logger logger = LogManager.getLogger(Composite90DocValuesWriter.class);

public Composite90DocValuesWriter(DocValuesConsumer delegate, SegmentWriteState segmentWriteState, MapperService mapperService)
throws IOException {
Expand Down Expand Up @@ -100,6 +108,8 @@ private void createCompositeIndicesIfPossible(DocValuesProducer valuesProducer,
// TODO : Call StarTree builder
}
}
StarTreesBuilder starTreesBuilder = new StarTreesBuilder(state, mapperService);
starTreesBuilder.build(fieldProducerMap);
}
}

Expand All @@ -108,7 +118,57 @@ public void merge(MergeState mergeState) throws IOException {
// TODO : check if class variable will cause concurrency issues
this.mergeState = mergeState;
super.merge(mergeState);
// TODO : handle merge star tree
// mergeStarTreeFields(mergeState);
mergeCompositeFields(mergeState);
}

/**
* Merges composite fields from multiple segments
* @param mergeState merge state
*/
private void mergeCompositeFields(MergeState mergeState) throws IOException {
mergeStarTreeFields(mergeState);
}

/**
* Merges star tree data fields from multiple segments
* @param mergeState merge state
*/
private void mergeStarTreeFields(MergeState mergeState) throws IOException {
Map<String, List<StarTreeValues>> starTreeSubsPerField = new HashMap<>();
Map<String, StarTreeField> starTreeFieldMap = new HashMap<>();
for (int i = 0; i < mergeState.docValuesProducers.length; i++) {
CompositeIndexReader reader = null;
if (mergeState.docValuesProducers[i] == null) {
continue;
}
if (mergeState.docValuesProducers[i] instanceof CompositeIndexReader) {
reader = (CompositeIndexReader) mergeState.docValuesProducers[i];
} else {
continue;
}

List<CompositeIndexFieldInfo> compositeFieldInfo = reader.getCompositeIndexFields();
for (CompositeIndexFieldInfo fieldInfo : compositeFieldInfo) {
if (fieldInfo.getType().equals(CompositeMappedFieldType.CompositeFieldType.STAR_TREE)) {
CompositeIndexValues compositeIndexValues = reader.getCompositeIndexValues(fieldInfo);
if (compositeIndexValues instanceof StarTreeValues) {
List<StarTreeValues> fieldsList = starTreeSubsPerField.getOrDefault(fieldInfo.getField(), Collections.emptyList());
if (!starTreeFieldMap.containsKey(fieldInfo.getField())) {
starTreeFieldMap.put(fieldInfo.getField(), ((StarTreeValues) compositeIndexValues).getStarTreeField());
}
// assert star tree configuration is same across segments
else {
assert starTreeFieldMap.get(fieldInfo.getField())
.equals(((StarTreeValues) compositeIndexValues).getStarTreeField());
logger.error("Star tree configuration is not same for segments during merge");
}
fieldsList.add((StarTreeValues) compositeIndexValues);
starTreeSubsPerField.put(fieldInfo.getField(), fieldsList);
}
}
}
}
final StarTreesBuilder starTreesBuilder = new StarTreesBuilder(state, mapperService);
starTreesBuilder.buildDuringMerge(starTreeFieldMap, starTreeSubsPerField);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.codec.composite;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.index.mapper.CompositeMappedFieldType;

/**
* Field info details of composite index fields
*
* @opensearch.experimental
*/
@ExperimentalApi
public class CompositeIndexFieldInfo {
private final String field;
private final CompositeMappedFieldType.CompositeFieldType type;

public CompositeIndexFieldInfo(String field, CompositeMappedFieldType.CompositeFieldType type) {
this.field = field;
this.type = type;
}

public String getField() {
return field;
}

public CompositeMappedFieldType.CompositeFieldType getType() {
return type;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.opensearch.index.codec.composite;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.index.mapper.CompositeMappedFieldType;

import java.io.IOException;
import java.util.List;
Expand All @@ -25,10 +24,10 @@ public interface CompositeIndexReader {
* Get list of composite index fields from the segment
*
*/
List<String> getCompositeIndexFields();
List<CompositeIndexFieldInfo> getCompositeIndexFields();

/**
* Get composite index values based on the field name and the field type
*/
CompositeIndexValues getCompositeIndexValues(String field, CompositeMappedFieldType.CompositeFieldType fieldType) throws IOException;
CompositeIndexValues getCompositeIndexValues(CompositeIndexFieldInfo fieldInfo) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@

package org.opensearch.index.codec.composite.datacube.startree;

import org.apache.lucene.search.DocIdSetIterator;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.index.codec.composite.CompositeIndexValues;
import org.opensearch.index.compositeindex.datacube.startree.StarTreeField;
import org.opensearch.index.compositeindex.datacube.startree.node.StarTreeNode;

import java.util.List;
import java.util.Map;

/**
* Concrete class that holds the star tree associated values from the segment
Expand All @@ -20,16 +23,41 @@
*/
@ExperimentalApi
public class StarTreeValues implements CompositeIndexValues {
private final List<String> dimensionsOrder;
private final StarTreeField starTreeField;
private final StarTreeNode root;
private final Map<String, DocIdSetIterator> dimensionDocValuesIteratorMap;
private final Map<String, DocIdSetIterator> metricDocValuesIteratorMap;

// TODO : come up with full set of vales such as dimensions and metrics doc values + star tree
public StarTreeValues(List<String> dimensionsOrder) {
super();
this.dimensionsOrder = dimensionsOrder;
public StarTreeValues(
StarTreeField starTreeField,
StarTreeNode root,
Map<String, DocIdSetIterator> dimensionDocValuesIteratorMap,
Map<String, DocIdSetIterator> metricDocValuesIteratorMap
) {
this.starTreeField = starTreeField;
this.root = root;
this.dimensionDocValuesIteratorMap = dimensionDocValuesIteratorMap;
this.metricDocValuesIteratorMap = metricDocValuesIteratorMap;
}

@Override
public CompositeIndexValues getValues() {
return this;
}

public StarTreeField getStarTreeField() {
return starTreeField;
}

public StarTreeNode getRoot() {
return root;
}

public Map<String, DocIdSetIterator> getDimensionDocValuesIteratorMap() {
return dimensionDocValuesIteratorMap;
}

public Map<String, DocIdSetIterator> getMetricDocValuesIteratorMap() {
return metricDocValuesIteratorMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import org.opensearch.index.compositeindex.datacube.MetricStat;
import org.opensearch.index.compositeindex.datacube.startree.aggregators.numerictype.StarTreeNumericType;
import org.opensearch.index.compositeindex.datacube.startree.utils.SequentialDocValuesIterator;
import org.opensearch.index.fielddata.IndexNumericFieldData;

import java.util.Comparator;
Expand All @@ -28,22 +27,14 @@ public class MetricAggregatorInfo implements Comparable<MetricAggregatorInfo> {
private final String field;
private final ValueAggregator valueAggregators;
private final StarTreeNumericType starTreeNumericType;
private final SequentialDocValuesIterator metricStatReader;

/**
* Constructor for MetricAggregatorInfo
*/
public MetricAggregatorInfo(
MetricStat metricStat,
String field,
String starFieldName,
IndexNumericFieldData.NumericType numericType,
SequentialDocValuesIterator metricStatReader
) {
public MetricAggregatorInfo(MetricStat metricStat, String field, String starFieldName, IndexNumericFieldData.NumericType numericType) {
this.metricStat = metricStat;
this.valueAggregators = ValueAggregatorFactory.getValueAggregator(metricStat);
this.starTreeNumericType = StarTreeNumericType.fromNumericType(numericType);
this.metricStatReader = metricStatReader;
this.field = field;
this.starFieldName = starFieldName;
this.metric = toFieldName();
Expand Down Expand Up @@ -84,13 +75,6 @@ public StarTreeNumericType getAggregatedValueType() {
return starTreeNumericType;
}

/**
* @return metric value reader iterator
*/
public SequentialDocValuesIterator getMetricStatReader() {
return metricStatReader;
}

/**
* @return field name with metric type and field
*/
Expand Down
Loading

0 comments on commit c6a78b5

Please sign in to comment.