Skip to content

Commit

Permalink
Star tree codec changes (#14514)
Browse files Browse the repository at this point in the history
---------
Signed-off-by: Bharathwaj G <[email protected]>
  • Loading branch information
bharath-techie committed Aug 20, 2024
1 parent 3b4cdcd commit 23e59c1
Show file tree
Hide file tree
Showing 16 changed files with 662 additions and 4 deletions.
16 changes: 12 additions & 4 deletions server/src/main/java/org/opensearch/index/codec/CodecService.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.common.Nullable;
import org.opensearch.common.collect.MapBuilder;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.codec.composite.CompositeCodecFactory;
import org.opensearch.index.mapper.MapperService;

import java.util.Map;
Expand All @@ -63,6 +64,7 @@ public class CodecService {
* the raw unfiltered lucene default. useful for testing
*/
public static final String LUCENE_DEFAULT_CODEC = "lucene_default";
private final CompositeCodecFactory compositeCodecFactory = new CompositeCodecFactory();

public CodecService(@Nullable MapperService mapperService, IndexSettings indexSettings, Logger logger) {
final MapBuilder<String, Codec> codecs = MapBuilder.<String, Codec>newMapBuilder();
Expand All @@ -73,10 +75,16 @@ public CodecService(@Nullable MapperService mapperService, IndexSettings indexSe
codecs.put(BEST_COMPRESSION_CODEC, new Lucene99Codec(Mode.BEST_COMPRESSION));
codecs.put(ZLIB, new Lucene99Codec(Mode.BEST_COMPRESSION));
} else {
codecs.put(DEFAULT_CODEC, new PerFieldMappingPostingFormatCodec(Mode.BEST_SPEED, mapperService, logger));
codecs.put(LZ4, new PerFieldMappingPostingFormatCodec(Mode.BEST_SPEED, mapperService, logger));
codecs.put(BEST_COMPRESSION_CODEC, new PerFieldMappingPostingFormatCodec(Mode.BEST_COMPRESSION, mapperService, logger));
codecs.put(ZLIB, new PerFieldMappingPostingFormatCodec(Mode.BEST_COMPRESSION, mapperService, logger));
// CompositeCodec still delegates to PerFieldMappingPostingFormatCodec
// We can still support all the compression codecs when composite index is present
if (mapperService.isCompositeIndexPresent()) {
codecs.putAll(compositeCodecFactory.getCompositeIndexCodecs(mapperService, logger));
} else {
codecs.put(DEFAULT_CODEC, new PerFieldMappingPostingFormatCodec(Mode.BEST_SPEED, mapperService, logger));
codecs.put(LZ4, new PerFieldMappingPostingFormatCodec(Mode.BEST_SPEED, mapperService, logger));
codecs.put(BEST_COMPRESSION_CODEC, new PerFieldMappingPostingFormatCodec(Mode.BEST_COMPRESSION, mapperService, logger));
codecs.put(ZLIB, new PerFieldMappingPostingFormatCodec(Mode.BEST_COMPRESSION, mapperService, logger));
}
}
codecs.put(LUCENE_DEFAULT_CODEC, Codec.getDefault());
for (String codec : Codec.availableCodecs()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.codec.composite;

import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.codecs.DocValuesFormat;
import org.apache.lucene.codecs.FilterCodec;
import org.apache.lucene.codecs.lucene99.Lucene99Codec;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.index.codec.PerFieldMappingPostingFormatCodec;
import org.opensearch.index.mapper.MapperService;

/**
* Extends the Codec to support new file formats for composite indices eg: star tree index
* based on the mappings.
*
* @opensearch.experimental
*/
@ExperimentalApi
public class Composite99Codec extends FilterCodec {
public static final String COMPOSITE_INDEX_CODEC_NAME = "Composite99Codec";
private final MapperService mapperService;

// needed for SPI - this is used in reader path
public Composite99Codec() {
this(COMPOSITE_INDEX_CODEC_NAME, new Lucene99Codec(), null);
}

public Composite99Codec(Lucene99Codec.Mode compressionMode, MapperService mapperService, Logger logger) {
this(COMPOSITE_INDEX_CODEC_NAME, new PerFieldMappingPostingFormatCodec(compressionMode, mapperService, logger), mapperService);
}

/**
* Sole constructor. When subclassing this codec, create a no-arg ctor and pass the delegate codec and a unique name to
* this ctor.
*
* @param name name of the codec
* @param delegate codec delegate
* @param mapperService mapper service instance
*/
protected Composite99Codec(String name, Codec delegate, MapperService mapperService) {
super(name, delegate);
this.mapperService = mapperService;
}

@Override
public DocValuesFormat docValuesFormat() {
return new Composite99DocValuesFormat(mapperService);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.codec.composite;

import org.apache.lucene.codecs.DocValuesConsumer;
import org.apache.lucene.codecs.DocValuesFormat;
import org.apache.lucene.codecs.DocValuesProducer;
import org.apache.lucene.codecs.lucene90.Lucene90DocValuesFormat;
import org.apache.lucene.index.SegmentReadState;
import org.apache.lucene.index.SegmentWriteState;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.index.mapper.MapperService;

import java.io.IOException;

/**
* DocValues format to handle composite indices
*
* @opensearch.experimental
*/
@ExperimentalApi
public class Composite99DocValuesFormat extends DocValuesFormat {
/**
* Creates a new docvalues format.
*
* <p>The provided name will be written into the index segment in some configurations (such as
* when using {@code PerFieldDocValuesFormat}): in such configurations, for the segment to be read
* this class should be registered with Java's SPI mechanism (registered in META-INF/ of your jar
* file, etc).
*/
private final DocValuesFormat delegate;
private final MapperService mapperService;

// needed for SPI
public Composite99DocValuesFormat() {
this(new Lucene90DocValuesFormat(), null);
}

public Composite99DocValuesFormat(MapperService mapperService) {
this(new Lucene90DocValuesFormat(), mapperService);
}

public Composite99DocValuesFormat(DocValuesFormat delegate, MapperService mapperService) {
super(delegate.getName());
this.delegate = delegate;
this.mapperService = mapperService;
}

@Override
public DocValuesConsumer fieldsConsumer(SegmentWriteState state) throws IOException {
return new Composite99DocValuesWriter(delegate.fieldsConsumer(state), state, mapperService);
}

@Override
public DocValuesProducer fieldsProducer(SegmentReadState state) throws IOException {
return new Composite99DocValuesReader(delegate.fieldsProducer(state), state);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.codec.composite;

import org.apache.lucene.codecs.DocValuesProducer;
import org.apache.lucene.index.BinaryDocValues;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.SegmentReadState;
import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.index.SortedSetDocValues;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.index.mapper.CompositeMappedFieldType;

import java.io.IOException;
import java.util.List;

/**
* Reader for star tree index and star tree doc values from the segments
*
* @opensearch.experimental
*/
@ExperimentalApi
public class Composite99DocValuesReader extends DocValuesProducer implements CompositeIndexReader {
private DocValuesProducer delegate;

public Composite99DocValuesReader(DocValuesProducer producer, SegmentReadState state) throws IOException {
this.delegate = producer;
// TODO : read star tree files
}

@Override
public NumericDocValues getNumeric(FieldInfo field) throws IOException {
return delegate.getNumeric(field);
}

@Override
public BinaryDocValues getBinary(FieldInfo field) throws IOException {
return delegate.getBinary(field);
}

@Override
public SortedDocValues getSorted(FieldInfo field) throws IOException {
return delegate.getSorted(field);
}

@Override
public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOException {
return delegate.getSortedNumeric(field);
}

@Override
public SortedSetDocValues getSortedSet(FieldInfo field) throws IOException {
return delegate.getSortedSet(field);
}

@Override
public void checkIntegrity() throws IOException {
delegate.checkIntegrity();
// Todo : check integrity of composite index related [star tree] files
}

@Override
public void close() throws IOException {
delegate.close();
// Todo: close composite index related files [star tree] files
}

@Override
public List<String> getCompositeIndexFields() {
// todo : read from file formats and get the field names.
throw new UnsupportedOperationException();

}

@Override
public CompositeIndexValues getCompositeIndexValues(String field, CompositeMappedFieldType.CompositeFieldType fieldType)
throws IOException {
// TODO : read compositeIndexValues [starTreeValues] from star tree files
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.codec.composite;

import org.apache.lucene.codecs.DocValuesConsumer;
import org.apache.lucene.codecs.DocValuesProducer;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.MergeState;
import org.apache.lucene.index.SegmentWriteState;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.index.mapper.CompositeMappedFieldType;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.mapper.StarTreeMapper;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;

/**
* This class write the star tree index and star tree doc values
* based on the doc values structures of the original index
*
* @opensearch.experimental
*/
@ExperimentalApi
public class Composite99DocValuesWriter extends DocValuesConsumer {
private final DocValuesConsumer delegate;
private final SegmentWriteState state;
private final MapperService mapperService;
AtomicReference<MergeState> mergeState = new AtomicReference<>();
private final Set<CompositeMappedFieldType> compositeMappedFieldTypes;
private final Set<String> compositeFieldSet;

private final Map<String, DocValuesProducer> fieldProducerMap = new HashMap<>();

public Composite99DocValuesWriter(DocValuesConsumer delegate, SegmentWriteState segmentWriteState, MapperService mapperService) {

this.delegate = delegate;
this.state = segmentWriteState;
this.mapperService = mapperService;
this.compositeMappedFieldTypes = mapperService.getCompositeFieldTypes();
compositeFieldSet = new HashSet<>();
for (CompositeMappedFieldType type : compositeMappedFieldTypes) {
compositeFieldSet.addAll(type.fields());
}
}

@Override
public void addNumericField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException {
delegate.addNumericField(field, valuesProducer);
}

@Override
public void addBinaryField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException {
delegate.addBinaryField(field, valuesProducer);
}

@Override
public void addSortedField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException {
delegate.addSortedField(field, valuesProducer);
}

@Override
public void addSortedNumericField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException {
delegate.addSortedNumericField(field, valuesProducer);
// Perform this only during flush flow
if (mergeState.get() == null) {
createCompositeIndicesIfPossible(valuesProducer, field);
}
}

@Override
public void addSortedSetField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException {
delegate.addSortedSetField(field, valuesProducer);
}

@Override
public void close() throws IOException {
delegate.close();
}

private void createCompositeIndicesIfPossible(DocValuesProducer valuesProducer, FieldInfo field) throws IOException {
if (compositeFieldSet.isEmpty()) return;
if (compositeFieldSet.contains(field.name)) {
fieldProducerMap.put(field.name, valuesProducer);
compositeFieldSet.remove(field.name);
}
// we have all the required fields to build composite fields
if (compositeFieldSet.isEmpty()) {
for (CompositeMappedFieldType mappedType : compositeMappedFieldTypes) {
if (mappedType instanceof StarTreeMapper.StarTreeFieldType) {
// TODO : Call StarTree builder
}
}
}
}

@Override
public void merge(MergeState mergeState) throws IOException {
this.mergeState.compareAndSet(null, mergeState);
super.merge(mergeState);
// TODO : handle merge star tree
// mergeStarTreeFields(mergeState);
}
}
Loading

0 comments on commit 23e59c1

Please sign in to comment.