diff --git a/sandbox/modules/custom-codecs/build.gradle b/sandbox/modules/custom-codecs/build.gradle new file mode 100644 index 0000000000000..f39e96f1dae5b --- /dev/null +++ b/sandbox/modules/custom-codecs/build.gradle @@ -0,0 +1,29 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +apply plugin: 'opensearch.opensearchplugin' +apply plugin: 'opensearch.yaml-rest-test' + +opensearchplugin { + name 'custom-codecs' + description 'A plugin that implements custom compression codecs.' + classname 'org.opensearch.index.codec.customcodecs.CustomCodecPlugin' + licenseFile rootProject.file('licenses/APACHE-LICENSE-2.0.txt') + noticeFile rootProject.file('NOTICE.txt') +} + +dependencies { + api "com.intel.qat:qat-java:1.1.0" + testImplementation project(':server').sourceSets.test.output +} + +yamlRestTest.enabled = false; +testingConventions.enabled = false; diff --git a/sandbox/modules/custom-codecs/licenses/qat-java-1.1.0.jar.sha1 b/sandbox/modules/custom-codecs/licenses/qat-java-1.1.0.jar.sha1 new file mode 100644 index 0000000000000..7a102206ada11 --- /dev/null +++ b/sandbox/modules/custom-codecs/licenses/qat-java-1.1.0.jar.sha1 @@ -0,0 +1 @@ +fbcaabdbf9d2a72d4b8222e7cfb2043a68b7860e diff --git a/sandbox/modules/custom-codecs/licenses/qat-java-LICENSE.txt b/sandbox/modules/custom-codecs/licenses/qat-java-LICENSE.txt new file mode 100644 index 0000000000000..d9356397f1250 --- /dev/null +++ b/sandbox/modules/custom-codecs/licenses/qat-java-LICENSE.txt @@ -0,0 +1,36 @@ +----------------------------------------------------------------------------- +** Beginning of "BSD License" text. ** + +Qat-Java: Qat-Java is a compression library that uses IntelĀ® QAT to accelerate +compression and decompression. + +Copyright(c) 2007-2023 Intel Corporation. All rights reserved. +All rights reserved. + +BSD License + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the + distribution. + * Neither the name of Intel Corporation nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/sandbox/modules/custom-codecs/licenses/qat-java-NOTICE.txt b/sandbox/modules/custom-codecs/licenses/qat-java-NOTICE.txt new file mode 100644 index 0000000000000..9e422fd6919fc --- /dev/null +++ b/sandbox/modules/custom-codecs/licenses/qat-java-NOTICE.txt @@ -0,0 +1 @@ +Qat-Java is a compression library that uses IntelĀ® QAT to accelerate compression and decompression. diff --git a/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecPlugin.java b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecPlugin.java new file mode 100644 index 0000000000000..02f1f09ceb0f2 --- /dev/null +++ b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecPlugin.java @@ -0,0 +1,48 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.customcodecs; + +import org.opensearch.common.settings.Setting; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.codec.CodecServiceFactory; +import org.opensearch.plugins.EnginePlugin; +import org.opensearch.plugins.Plugin; + +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +/** + * A plugin that implements custom codecs. Supports these codecs: + * + * + * @opensearch.internal + */ +public final class CustomCodecPlugin extends Plugin implements EnginePlugin { + + /** Creates a new instance */ + public CustomCodecPlugin() {} + + /** + * @param indexSettings is the default indexSettings + * @return the engine factory + */ + @Override + public Optional getCustomCodecServiceFactory(final IndexSettings indexSettings) { + return Optional.of(new CustomCodecServiceFactory()); + } + + @Override + public List> getSettings() { + return Arrays.asList(Lucene99QatCodec.INDEX_CODEC_MODE_SETTING); + } +} diff --git a/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecService.java b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecService.java new file mode 100644 index 0000000000000..e580464d4ba18 --- /dev/null +++ b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecService.java @@ -0,0 +1,68 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.customcodecs; + +import org.apache.lucene.codecs.Codec; +import org.opensearch.common.collect.MapBuilder; +import org.opensearch.index.codec.CodecService; +import org.opensearch.index.codec.CodecServiceConfig; +import org.opensearch.index.mapper.MapperService; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Map; + +/** + * CustomCodecService provides QDEFLATE and QLZ4 compression codecs. + */ +public class CustomCodecService extends CodecService { + private final Map codecs; + + /** + * Parameterized ctor for CustomCodecService + * @param codecServiceConfig Generic codec service config + */ + public CustomCodecService(CodecServiceConfig codecServiceConfig) { + super(codecServiceConfig.getMapperService(), codecServiceConfig.getIndexSettings(), codecServiceConfig.getLogger()); + MapperService mapperService = codecServiceConfig.getMapperService(); + + final MapBuilder codecs = MapBuilder.newMapBuilder(); + String accelerationMode = codecServiceConfig.getIndexSettings().getValue(Lucene99QatCodec.INDEX_CODEC_MODE_SETTING); + if (mapperService == null) { + codecs.put(Lucene99QatCodec.Mode.QDEFLATE.name(), new QatDeflateCodec(accelerationMode)); + codecs.put(Lucene99QatCodec.Mode.QLZ4.name(), new QatLz4Codec(accelerationMode)); + } else { + codecs.put( + Lucene99QatCodec.Mode.QDEFLATE.name(), + new PerFieldMappingPostingFormatCodec(Lucene99QatCodec.Mode.QDEFLATE, accelerationMode, mapperService) + ); + codecs.put( + Lucene99QatCodec.Mode.QLZ4.name(), + new PerFieldMappingPostingFormatCodec(Lucene99QatCodec.Mode.QLZ4, accelerationMode, mapperService) + ); + } + this.codecs = codecs.immutableMap(); + } + + @Override + public Codec codec(String name) { + Codec codec = super.codec(name); + if (codec == null) { + codec = codecs.get(name); + } + return codec; + } + + @Override + public String[] availableCodecs() { + ArrayList ac = new ArrayList(Arrays.asList(super.availableCodecs())); + ac.addAll(codecs.keySet()); + return ac.toArray(new String[0]); + } +} diff --git a/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecServiceFactory.java b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecServiceFactory.java new file mode 100644 index 0000000000000..220acd01a5938 --- /dev/null +++ b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecServiceFactory.java @@ -0,0 +1,27 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.customcodecs; + +import org.opensearch.index.codec.CodecService; +import org.opensearch.index.codec.CodecServiceConfig; +import org.opensearch.index.codec.CodecServiceFactory; + +/** + * A factory for creating new {@link CodecService} instance + */ +public class CustomCodecServiceFactory implements CodecServiceFactory { + + /** Creates a new instance. */ + public CustomCodecServiceFactory() {} + + @Override + public CodecService createCodecService(CodecServiceConfig config) { + return new CustomCodecService(config); + } +} diff --git a/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/Lucene99QatCodec.java b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/Lucene99QatCodec.java new file mode 100644 index 0000000000000..34bd5f4e377d6 --- /dev/null +++ b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/Lucene99QatCodec.java @@ -0,0 +1,73 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.customcodecs; + +import org.apache.lucene.codecs.FilterCodec; +import org.apache.lucene.codecs.StoredFieldsFormat; +import org.apache.lucene.codecs.lucene99.Lucene99Codec; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Setting.Property; + +import com.intel.qat.QatZipper; + +import static com.intel.qat.QatZipper.Algorithm; +import static com.intel.qat.QatZipper.PollingMode; + +abstract class Lucene99QatCodec extends FilterCodec { + + public static QatZipper getCompressor(Algorithm algorithm, int level, QatZipper.Mode mode, PollingMode pmode) { + return new QatZipper(algorithm, level, mode, pmode); + } + + public static QatZipper getCompressor(Algorithm algorithm, QatZipper.Mode mode, PollingMode pmode) { + return new QatZipper(algorithm, mode, pmode); + } + + public static final int DEFAULT_COMPRESSION_LEVEL = 6; + + /** Each mode represents a compression algorithm. */ + public enum Mode { + QDEFLATE, + QLZ4 + } + + public static final Setting INDEX_CODEC_MODE_SETTING = new Setting<>("index.codec.mode", "hardware", s -> { + switch (s) { + case "auto": + case "hardware": + return s; + default: + throw new IllegalArgumentException("unknown value for [index.codec.mode] must be one of [auto, hardware] but was: " + s); + } + }, Property.IndexScope, Property.NodeScope); + + private final StoredFieldsFormat storedFieldsFormat; + + /** + * new codec for a given compression algorithm and default compression level + */ + public Lucene99QatCodec(Mode mode, String accelerationMode) { + this(mode, DEFAULT_COMPRESSION_LEVEL, accelerationMode); + } + + public Lucene99QatCodec(Mode mode, int compressionLevel, String accelerationMode) { + super(mode.name(), new Lucene99Codec()); + this.storedFieldsFormat = new Lucene99QatStoredFieldsFormat(mode, compressionLevel, accelerationMode); + } + + @Override + public StoredFieldsFormat storedFieldsFormat() { + return storedFieldsFormat; + } + + @Override + public String toString() { + return getClass().getSimpleName(); + } +} diff --git a/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/Lucene99QatStoredFieldsFormat.java b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/Lucene99QatStoredFieldsFormat.java new file mode 100644 index 0000000000000..a5405ba55a526 --- /dev/null +++ b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/Lucene99QatStoredFieldsFormat.java @@ -0,0 +1,138 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.customcodecs; + +import org.apache.lucene.codecs.StoredFieldsFormat; +import org.apache.lucene.codecs.StoredFieldsReader; +import org.apache.lucene.codecs.StoredFieldsWriter; +import org.apache.lucene.codecs.compressing.CompressionMode; +import org.apache.lucene.codecs.lucene90.compressing.Lucene90CompressingStoredFieldsFormat; +import org.apache.lucene.index.FieldInfos; +import org.apache.lucene.index.SegmentInfo; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; + +import java.io.IOException; +import java.util.Objects; + +/** Stored field format used by pluggable codec */ +public class Lucene99QatStoredFieldsFormat extends StoredFieldsFormat { + + /** A key that we use to map to a mode */ + public static final String MODE_KEY = Lucene99QatStoredFieldsFormat.class.getSimpleName() + ".mode"; + + private static final int DEFLATE_BLOCK_LENGTH = 10 * 48 * 1024; + private static final int DEFLATE_MAX_DOCS_PER_BLOCK = 4096; + private static final int LZ4_BLOCK_LENGTH = 10 * 8 * 1024; + private static final int LZ4_MAX_DOCS_PER_BLOCK = 4096; + private static final int BLOCK_SHIFT = 10; + + private final CompressionMode qatDeflateMode; + private final CompressionMode qatLz4Mode; + private final Lucene99QatCodec.Mode mode; + + /** default constructor */ + /*public Lucene99QatStoredFieldsFormat() { + this(Lucene99QatCodec.Mode.QDEFLATE, Lucene99QatCodec.DEFAULT_COMPRESSION_LEVEL); + }*/ + + /** + * Creates a new instance. + * + * @param mode The mode represents QDEFLATE or QLZ4 + */ + /*public Lucene99QatStoredFieldsFormat(Lucene99QatCodec.Mode mode) { + this(mode, Lucene99QatCodec.DEFAULT_COMPRESSION_LEVEL); + }*/ + + /** + * Creates a new instance with the specified mode and compression level. + * + * @param mode The mode represents QDEFLATE or QLZ4 + * @param compressionLevel The compression level for the mode. + */ + /*public Lucene99QatStoredFieldsFormat(Lucene99QatCodec.Mode mode, int compressionLevel) { + this.mode = Objects.requireNonNull(mode); + qatDeflateMode = new QatDeflateMode(compressionLevel); + qatLz4Mode = new QatLz4Mode(compressionLevel); + }*/ + + /** + * Creates a new instance with the specified mode and compression level. + * + * @param mode The mode represents QDEFLATE or QLZ4 + * @param compressionLevel The compression level for the mode. + * @param accelerationMode The acceleration mode. + */ + public Lucene99QatStoredFieldsFormat(Lucene99QatCodec.Mode mode, int compressionLevel, String accelerationMode) { + this.mode = Objects.requireNonNull(mode); + qatDeflateMode = new QatDeflateMode(compressionLevel, accelerationMode); + qatLz4Mode = new QatLz4Mode(compressionLevel, accelerationMode); + } + + /** + * Returns a {@link StoredFieldsReader} to load stored fields. + * @param directory The index directory. + * @param si The SegmentInfo that stores segment information. + * @param fn The fieldInfos. + * @param context The IOContext that holds additional details on the + * merge/search context. + */ + @Override + public StoredFieldsReader fieldsReader(Directory directory, SegmentInfo si, FieldInfos fn, IOContext context) throws IOException { + String value = si.getAttribute(MODE_KEY); + if (value == null) { + throw new IllegalStateException("missing value for " + MODE_KEY + " for segment: " + si.name); + } + Lucene99QatCodec.Mode mode = Lucene99QatCodec.Mode.valueOf(value); + return impl(mode).fieldsReader(directory, si, fn, context); + } + + /** + * Returns a {@link StoredFieldsReader} to write stored fields. + * @param directory The index directory. + * @param si The SegmentInfo that stores segment information. + * @param context The IOContext that holds additional details on the + * merge/search context. + */ + @Override + public StoredFieldsWriter fieldsWriter(Directory directory, SegmentInfo si, IOContext context) throws IOException { + String previous = si.putAttribute(MODE_KEY, mode.name()); + if (previous != null && previous.equals(mode.name()) == false) { + throw new IllegalStateException( + "found existing value for " + MODE_KEY + " for segment: " + si.name + " old = " + previous + ", new = " + mode.name() + ); + } + return impl(mode).fieldsWriter(directory, si, context); + } + + private StoredFieldsFormat impl(Lucene99QatCodec.Mode mode) { + switch (mode) { + case QDEFLATE: + return new Lucene90CompressingStoredFieldsFormat( + "CustomStoredFieldsQatDeflate", + qatDeflateMode, + DEFLATE_BLOCK_LENGTH, + DEFLATE_MAX_DOCS_PER_BLOCK, + BLOCK_SHIFT + ); + + case QLZ4: + return new Lucene90CompressingStoredFieldsFormat( + "CustomStoredFieldsQatLz4", + qatLz4Mode, + LZ4_BLOCK_LENGTH, + LZ4_MAX_DOCS_PER_BLOCK, + BLOCK_SHIFT + ); + default: + throw new AssertionError(); + } + } +} diff --git a/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/PerFieldMappingPostingFormatCodec.java b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/PerFieldMappingPostingFormatCodec.java new file mode 100644 index 0000000000000..65170b2755dd6 --- /dev/null +++ b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/PerFieldMappingPostingFormatCodec.java @@ -0,0 +1,29 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.customcodecs; + +import org.opensearch.index.mapper.MapperService; + +/** + * PerFieldMappingPostingFormatCodec. {@link + * org.opensearch.index.codec.PerFieldMappingPostingFormatCodec} + */ +public class PerFieldMappingPostingFormatCodec extends Lucene99QatCodec { + + /** + * Creates a new instance. + * + * @param compressionMode The compression mode (QDEFLATE or QLZ4). + * @param accelerationMode The acceleration mode. + * @param mapperService The mapper service. + */ + public PerFieldMappingPostingFormatCodec(Lucene99QatCodec.Mode compressionMode, String accelerationMode, MapperService mapperService) { + super(compressionMode, accelerationMode); + } +} diff --git a/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/QatDeflateCodec.java b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/QatDeflateCodec.java new file mode 100644 index 0000000000000..b1dd74a7091f3 --- /dev/null +++ b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/QatDeflateCodec.java @@ -0,0 +1,45 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.customcodecs; + +/** + * QatDeflateCodec provides a compressor using the qat-java library. + */ +public class QatDeflateCodec extends Lucene99QatCodec { + + /** default constructor */ + public QatDeflateCodec() { + this(DEFAULT_COMPRESSION_LEVEL, "hardware"); + } + + /** + * Creates a new QatDeflateCodec instance with the default compression level. + * + * @param accelerationMode The acceleration mode. + */ + public QatDeflateCodec(String accelerationMode) { + this(DEFAULT_COMPRESSION_LEVEL, accelerationMode); + } + + /** + * Creates a new QatDeflateCodec instance. + * + * @param compressionLevel The compression level. + * @param accelerationMode The acceleration mode. + */ + public QatDeflateCodec(int compressionLevel, String accelerationMode) { + super(Mode.QDEFLATE, compressionLevel, accelerationMode); + } + + @Override + public String toString() { + return getClass().getSimpleName(); + } +} diff --git a/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/QatDeflateMode.java b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/QatDeflateMode.java new file mode 100644 index 0000000000000..bf909e783c6f6 --- /dev/null +++ b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/QatDeflateMode.java @@ -0,0 +1,203 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.customcodecs; + +import org.apache.lucene.codecs.compressing.CompressionMode; +import org.apache.lucene.codecs.compressing.Compressor; +import org.apache.lucene.codecs.compressing.Decompressor; +import org.apache.lucene.store.ByteBuffersDataInput; +import org.apache.lucene.store.DataInput; +import org.apache.lucene.store.DataOutput; +import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.BytesRef; + +import java.io.IOException; + +import com.intel.qat.QatZipper; + +/** QDEFLATE Compression Mode */ +public class QatDeflateMode extends CompressionMode { + + private static final int NUM_SUB_BLOCKS = 10; + private static final int DEFAULT_COMPRESSION_LEVEL = 6; + + private final int compressionLevel; + private final QatZipper.Mode qatMode; + + /** default constructor + * @param accelerationMode The acceleration mode. + */ + protected QatDeflateMode(String accelerationMode) { + this.compressionLevel = DEFAULT_COMPRESSION_LEVEL; + this.qatMode = getMode(accelerationMode); + } + + static QatZipper.Mode getMode(String accelerationMode) { + QatZipper.Mode mode; + switch (accelerationMode) { + case "hardware": + mode = QatZipper.Mode.HARDWARE; + break; + case "auto": + mode = QatZipper.Mode.AUTO; + break; + default: + throw new IllegalArgumentException("invalid mode"); + } + return mode; + } + + /** + * Creates a new instance. + * + * @param compressionLevel The compression level to use. + * @param accelerationMode The acceleration mode. + */ + protected QatDeflateMode(int compressionLevel, String accelerationMode) { + this.compressionLevel = compressionLevel; + this.qatMode = getMode(accelerationMode); + } + + @Override + public Compressor newCompressor() { + return new QatCompressor(compressionLevel, qatMode); + } + + @Override + public Decompressor newDecompressor() { + return new QatDecompressor(qatMode); + } + + /** zstandard compressor */ + private static final class QatCompressor extends Compressor { + + private byte[] compressedBuffer; + + private QatZipper qatZipper; + + /** compressor with a given compresion level */ + public QatCompressor(int compressionLevel, QatZipper.Mode mode) { + compressedBuffer = BytesRef.EMPTY_BYTES; + qatZipper = Lucene99QatCodec.getCompressor( + QatZipper.Algorithm.DEFLATE, + compressionLevel, + mode, + QatZipper.PollingMode.PERIODICAL + ); + } + + private void compress(byte[] bytes, int offset, int length, DataOutput out) throws IOException { + assert offset >= 0 : "Offset value must be greater than 0."; + + int blockLength = (length + NUM_SUB_BLOCKS - 1) / NUM_SUB_BLOCKS; + out.writeVInt(blockLength); + + final int end = offset + length; + assert end >= 0 : "Buffer read size must be greater than 0."; + + for (int start = offset; start < end; start += blockLength) { + int l = Math.min(blockLength, end - start); + + if (l == 0) { + out.writeVInt(0); + return; + } + + final int maxCompressedLength = qatZipper.maxCompressedLength(l); + compressedBuffer = ArrayUtil.grow(compressedBuffer, maxCompressedLength); + + int compressedSize = qatZipper.compress(bytes, start, l, compressedBuffer, 0, compressedBuffer.length); + out.writeVInt(compressedSize); + out.writeBytes(compressedBuffer, compressedSize); + } + } + + @Override + public void compress(ByteBuffersDataInput buffersInput, DataOutput out) throws IOException { + final int length = (int) buffersInput.size(); + byte[] bytes = new byte[length]; + buffersInput.readBytes(bytes, 0, length); + compress(bytes, 0, length, out); + } + + @Override + public void close() throws IOException {} + } + + /** zstandard decompressor */ + private static final class QatDecompressor extends Decompressor { + + private byte[] compressed; + private QatZipper qatZipper; + final QatZipper.Mode qatMode; + + /** default decompressor */ + public QatDecompressor(QatZipper.Mode mode) { + compressed = BytesRef.EMPTY_BYTES; + qatZipper = Lucene99QatCodec.getCompressor(QatZipper.Algorithm.DEFLATE, mode, QatZipper.PollingMode.PERIODICAL); + this.qatMode = mode; + } + + /*resuable decompress function*/ + @Override + public void decompress(DataInput in, int originalLength, int offset, int length, BytesRef bytes) throws IOException { + assert offset + length <= originalLength : "Buffer read size must be within limit."; + + if (length == 0) { + bytes.length = 0; + return; + } + + final int blockLength = in.readVInt(); + bytes.offset = bytes.length = 0; + int offsetInBlock = 0; + int offsetInBytesRef = offset; + + // Skip unneeded blocks + while (offsetInBlock + blockLength < offset) { + final int compressedLength = in.readVInt(); + in.skipBytes(compressedLength); + offsetInBlock += blockLength; + offsetInBytesRef -= blockLength; + } + + // Read blocks that intersect with the interval we need + while (offsetInBlock < offset + length) { + bytes.bytes = ArrayUtil.grow(bytes.bytes, bytes.length + blockLength); + final int compressedLength = in.readVInt(); + if (compressedLength == 0) { + return; + } + compressed = ArrayUtil.grow(compressed, compressedLength); + in.readBytes(compressed, 0, compressedLength); + + int l = Math.min(blockLength, originalLength - offsetInBlock); + bytes.bytes = ArrayUtil.grow(bytes.bytes, bytes.length + l); + + byte[] output = new byte[l]; + + final int uncompressed = qatZipper.decompress(compressed, 0, compressedLength, output, 0, l); + System.arraycopy(output, 0, bytes.bytes, bytes.length, uncompressed); + + bytes.length += uncompressed; + offsetInBlock += blockLength; + } + + bytes.offset = offsetInBytesRef; + bytes.length = length; + + assert bytes.isValid() : "Decompression output is corrupted."; + } + + @Override + public Decompressor clone() { + return new QatDecompressor(qatMode); + } + } +} diff --git a/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/QatLz4Codec.java b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/QatLz4Codec.java new file mode 100644 index 0000000000000..5426b3bea7781 --- /dev/null +++ b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/QatLz4Codec.java @@ -0,0 +1,44 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.customcodecs; + +/** + * QatLz4Codec provides a compressor using the qat-java library. + */ +public class QatLz4Codec extends Lucene99QatCodec { + + /** default constructor */ + public QatLz4Codec() { + this(DEFAULT_COMPRESSION_LEVEL, "hardware"); + } + + /** + * Creates a new QatLz4Codec instance with the default compression level. + * @param accelerationMode The acceleration mode. + */ + public QatLz4Codec(String accelerationMode) { + this(DEFAULT_COMPRESSION_LEVEL, accelerationMode); + } + + /** + * Creates a new QatLz4Codec instance. + * + * @param compressionLevel The compression level. + * @param accelerationMode The acceleration mode. + */ + public QatLz4Codec(int compressionLevel, String accelerationMode) { + super(Mode.QLZ4, compressionLevel, accelerationMode); + } + + @Override + public String toString() { + return getClass().getSimpleName(); + } +} diff --git a/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/QatLz4Mode.java b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/QatLz4Mode.java new file mode 100644 index 0000000000000..a32a44faa97b1 --- /dev/null +++ b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/QatLz4Mode.java @@ -0,0 +1,183 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.customcodecs; + +import org.apache.lucene.codecs.compressing.CompressionMode; +import org.apache.lucene.codecs.compressing.Compressor; +import org.apache.lucene.codecs.compressing.Decompressor; +import org.apache.lucene.store.ByteBuffersDataInput; +import org.apache.lucene.store.DataInput; +import org.apache.lucene.store.DataOutput; +import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.BytesRef; + +import java.io.IOException; + +import com.intel.qat.QatZipper; + +/** QLZ4 Compression Mode */ +public class QatLz4Mode extends CompressionMode { + + private static final int NUM_SUB_BLOCKS = 10; + private static final int DEFAULT_COMPRESSION_LEVEL = 6; + + private final int compressionLevel; + private final QatZipper.Mode qatMode; + + /** default constructor + * @param accelerationMode The acceleration mode. + */ + protected QatLz4Mode(String accelerationMode) { + this.compressionLevel = DEFAULT_COMPRESSION_LEVEL; + this.qatMode = QatDeflateMode.getMode(accelerationMode); + } + + /** + * Creates a new instance. + * + * @param compressionLevel The compression level to use. + * @param accelerationMode The acceleration mode. + */ + protected QatLz4Mode(int compressionLevel, String accelerationMode) { + this.compressionLevel = compressionLevel; + this.qatMode = QatDeflateMode.getMode(accelerationMode); + } + + @Override + public Compressor newCompressor() { + return new QatCompressor(compressionLevel, qatMode); + } + + @Override + public Decompressor newDecompressor() { + return new QatDecompressor(qatMode); + } + + /** zstandard compressor */ + private static final class QatCompressor extends Compressor { + + private byte[] compressedBuffer; + + private QatZipper qatZipper; + + /** compressor with a given compresion level */ + public QatCompressor(int compressionLevel, QatZipper.Mode mode) { + compressedBuffer = BytesRef.EMPTY_BYTES; + qatZipper = Lucene99QatCodec.getCompressor(QatZipper.Algorithm.LZ4, compressionLevel, mode, QatZipper.PollingMode.PERIODICAL); + } + + private void compress(byte[] bytes, int offset, int length, DataOutput out) throws IOException { + assert offset >= 0 : "Offset value must be greater than 0."; + + int blockLength = (length + NUM_SUB_BLOCKS - 1) / NUM_SUB_BLOCKS; + out.writeVInt(blockLength); + + final int end = offset + length; + assert end >= 0 : "Buffer read size must be greater than 0."; + + for (int start = offset; start < end; start += blockLength) { + int l = Math.min(blockLength, end - start); + + if (l == 0) { + out.writeVInt(0); + return; + } + + final int maxCompressedLength = qatZipper.maxCompressedLength(l); + compressedBuffer = ArrayUtil.grow(compressedBuffer, maxCompressedLength); + + int compressedSize = qatZipper.compress(bytes, start, l, compressedBuffer, 0, compressedBuffer.length); + out.writeVInt(compressedSize); + out.writeBytes(compressedBuffer, compressedSize); + } + } + + @Override + public void compress(ByteBuffersDataInput buffersInput, DataOutput out) throws IOException { + final int length = (int) buffersInput.size(); + byte[] bytes = new byte[length]; + buffersInput.readBytes(bytes, 0, length); + compress(bytes, 0, length, out); + } + + @Override + public void close() throws IOException {} + } + + /** zstandard decompressor */ + private static final class QatDecompressor extends Decompressor { + + private byte[] compressed; + private QatZipper qatZipper; + private final QatZipper.Mode qatMode; + + /** default decompressor */ + public QatDecompressor(QatZipper.Mode mode) { + compressed = BytesRef.EMPTY_BYTES; + qatZipper = Lucene99QatCodec.getCompressor(QatZipper.Algorithm.LZ4, mode, QatZipper.PollingMode.PERIODICAL); + qatMode = mode; + } + + /*resuable decompress function*/ + @Override + public void decompress(DataInput in, int originalLength, int offset, int length, BytesRef bytes) throws IOException { + assert offset + length <= originalLength : "Buffer read size must be within limit."; + + if (length == 0) { + bytes.length = 0; + return; + } + + final int blockLength = in.readVInt(); + bytes.offset = bytes.length = 0; + int offsetInBlock = 0; + int offsetInBytesRef = offset; + + // Skip unneeded blocks + while (offsetInBlock + blockLength < offset) { + final int compressedLength = in.readVInt(); + in.skipBytes(compressedLength); + offsetInBlock += blockLength; + offsetInBytesRef -= blockLength; + } + + // Read blocks that intersect with the interval we need + while (offsetInBlock < offset + length) { + bytes.bytes = ArrayUtil.grow(bytes.bytes, bytes.length + blockLength); + final int compressedLength = in.readVInt(); + if (compressedLength == 0) { + return; + } + compressed = ArrayUtil.grow(compressed, compressedLength); + in.readBytes(compressed, 0, compressedLength); + + int l = Math.min(blockLength, originalLength - offsetInBlock); + bytes.bytes = ArrayUtil.grow(bytes.bytes, bytes.length + l); + + byte[] output = new byte[l]; + + final int uncompressed = qatZipper.decompress(compressed, 0, compressedLength, output, 0, l); + System.arraycopy(output, 0, bytes.bytes, bytes.length, uncompressed); + + bytes.length += uncompressed; + offsetInBlock += blockLength; + } + + bytes.offset = offsetInBytesRef; + bytes.length = length; + + assert bytes.isValid() : "Decompression output is corrupted."; + } + + @Override + public Decompressor clone() { + return new QatDecompressor(qatMode); + } + } +} diff --git a/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/package-info.java b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/package-info.java new file mode 100644 index 0000000000000..e996873963b1b --- /dev/null +++ b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * A plugin that implements compression codecs with native implementation. + */ +package org.opensearch.index.codec.customcodecs; diff --git a/sandbox/modules/custom-codecs/src/main/plugin-metadata/plugin-security.policy b/sandbox/modules/custom-codecs/src/main/plugin-metadata/plugin-security.policy new file mode 100644 index 0000000000000..6d3fe80db5c0f --- /dev/null +++ b/sandbox/modules/custom-codecs/src/main/plugin-metadata/plugin-security.policy @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +grant codeBase "${codebase.qat-java}" { + permission java.lang.RuntimePermission "loadLibrary.*"; + permission org.opensearch.secure_sm.ThreadPermission "modifyArbitraryThread"; +}; diff --git a/sandbox/modules/custom-codecs/src/main/resources/META-INF/services/org.apache.lucene.codecs.Codec b/sandbox/modules/custom-codecs/src/main/resources/META-INF/services/org.apache.lucene.codecs.Codec new file mode 100644 index 0000000000000..92ef57549500a --- /dev/null +++ b/sandbox/modules/custom-codecs/src/main/resources/META-INF/services/org.apache.lucene.codecs.Codec @@ -0,0 +1,2 @@ +org.opensearch.index.codec.customcodecs.QatDeflateCodec +org.opensearch.index.codec.customcodecs.QatLz4Codec diff --git a/sandbox/modules/custom-codecs/src/test/java/org/opensearch/index/codec/customcodecs/AbstractCompressorTests.java b/sandbox/modules/custom-codecs/src/test/java/org/opensearch/index/codec/customcodecs/AbstractCompressorTests.java new file mode 100644 index 0000000000000..cc794eb2c48f1 --- /dev/null +++ b/sandbox/modules/custom-codecs/src/test/java/org/opensearch/index/codec/customcodecs/AbstractCompressorTests.java @@ -0,0 +1,219 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.customcodecs; + +import org.apache.lucene.codecs.compressing.Compressor; +import org.apache.lucene.codecs.compressing.Decompressor; +import org.apache.lucene.store.ByteArrayDataInput; +import org.apache.lucene.store.ByteBuffersDataInput; +import org.apache.lucene.store.ByteBuffersDataOutput; +import org.apache.lucene.tests.util.LineFileDocs; +import org.apache.lucene.tests.util.TestUtil; +import org.apache.lucene.util.BytesRef; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Random; + +/** + * Test cases for compressors (based on {@See org.opensearch.common.compress.DeflateCompressTests}). + */ +public abstract class AbstractCompressorTests extends OpenSearchTestCase { + + abstract Compressor compressor(); + + abstract Decompressor decompressor(); + + public void testEmpty() throws IOException { + final byte[] bytes = "".getBytes(StandardCharsets.UTF_8); + doTest(bytes); + } + + public void testShortLiterals() throws IOException { + final byte[] bytes = "1234567345673456745608910123".getBytes(StandardCharsets.UTF_8); + doTest(bytes); + } + + public void testRandom() throws IOException { + Random r = random(); + for (int i = 0; i < 10; i++) { + final byte[] bytes = new byte[TestUtil.nextInt(r, 1, 100000)]; + r.nextBytes(bytes); + doTest(bytes); + } + } + + public void testLineDocs() throws IOException { + Random r = random(); + LineFileDocs lineFileDocs = new LineFileDocs(r); + for (int i = 0; i < 10; i++) { + int numDocs = TestUtil.nextInt(r, 1, 200); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + for (int j = 0; j < numDocs; j++) { + String s = lineFileDocs.nextDoc().get("body"); + bos.write(s.getBytes(StandardCharsets.UTF_8)); + } + doTest(bos.toByteArray()); + } + lineFileDocs.close(); + } + + public void testRepetitionsL() throws IOException { + Random r = random(); + for (int i = 0; i < 10; i++) { + int numLongs = TestUtil.nextInt(r, 1, 10000); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + long theValue = r.nextLong(); + for (int j = 0; j < numLongs; j++) { + if (r.nextInt(10) == 0) { + theValue = r.nextLong(); + } + bos.write((byte) (theValue >>> 56)); + bos.write((byte) (theValue >>> 48)); + bos.write((byte) (theValue >>> 40)); + bos.write((byte) (theValue >>> 32)); + bos.write((byte) (theValue >>> 24)); + bos.write((byte) (theValue >>> 16)); + bos.write((byte) (theValue >>> 8)); + bos.write((byte) theValue); + } + doTest(bos.toByteArray()); + } + } + + public void testRepetitionsI() throws IOException { + Random r = random(); + for (int i = 0; i < 10; i++) { + int numInts = TestUtil.nextInt(r, 1, 20000); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + int theValue = r.nextInt(); + for (int j = 0; j < numInts; j++) { + if (r.nextInt(10) == 0) { + theValue = r.nextInt(); + } + bos.write((byte) (theValue >>> 24)); + bos.write((byte) (theValue >>> 16)); + bos.write((byte) (theValue >>> 8)); + bos.write((byte) theValue); + } + doTest(bos.toByteArray()); + } + } + + public void testRepetitionsS() throws IOException { + Random r = random(); + for (int i = 0; i < 10; i++) { + int numShorts = TestUtil.nextInt(r, 1, 40000); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + short theValue = (short) r.nextInt(65535); + for (int j = 0; j < numShorts; j++) { + if (r.nextInt(10) == 0) { + theValue = (short) r.nextInt(65535); + } + bos.write((byte) (theValue >>> 8)); + bos.write((byte) theValue); + } + doTest(bos.toByteArray()); + } + } + + public void testMixed() throws IOException { + Random r = random(); + LineFileDocs lineFileDocs = new LineFileDocs(r); + for (int i = 0; i < 2; ++i) { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + int prevInt = r.nextInt(); + long prevLong = r.nextLong(); + while (bos.size() < 400000) { + switch (r.nextInt(4)) { + case 0: + addInt(r, prevInt, bos); + break; + case 1: + addLong(r, prevLong, bos); + break; + case 2: + addString(lineFileDocs, bos); + break; + case 3: + addBytes(r, bos); + break; + default: + throw new IllegalStateException("Random is broken"); + } + } + doTest(bos.toByteArray()); + } + } + + private void addLong(Random r, long prev, ByteArrayOutputStream bos) { + long theValue = prev; + if (r.nextInt(10) != 0) { + theValue = r.nextLong(); + } + bos.write((byte) (theValue >>> 56)); + bos.write((byte) (theValue >>> 48)); + bos.write((byte) (theValue >>> 40)); + bos.write((byte) (theValue >>> 32)); + bos.write((byte) (theValue >>> 24)); + bos.write((byte) (theValue >>> 16)); + bos.write((byte) (theValue >>> 8)); + bos.write((byte) theValue); + } + + private void addInt(Random r, int prev, ByteArrayOutputStream bos) { + int theValue = prev; + if (r.nextInt(10) != 0) { + theValue = r.nextInt(); + } + bos.write((byte) (theValue >>> 24)); + bos.write((byte) (theValue >>> 16)); + bos.write((byte) (theValue >>> 8)); + bos.write((byte) theValue); + } + + private void addString(LineFileDocs lineFileDocs, ByteArrayOutputStream bos) throws IOException { + String s = lineFileDocs.nextDoc().get("body"); + bos.write(s.getBytes(StandardCharsets.UTF_8)); + } + + private void addBytes(Random r, ByteArrayOutputStream bos) throws IOException { + byte bytes[] = new byte[TestUtil.nextInt(r, 1, 10000)]; + r.nextBytes(bytes); + bos.write(bytes); + } + + private void doTest(byte[] bytes) throws IOException { + final int length = bytes.length; + + ByteBuffersDataInput in = new ByteBuffersDataInput(List.of(ByteBuffer.wrap(bytes))); + ByteBuffersDataOutput out = new ByteBuffersDataOutput(); + + // let's compress + Compressor compressor = compressor(); + compressor.compress(in, out); + byte[] compressed = out.toArrayCopy(); + + // let's decompress + BytesRef outbytes = new BytesRef(); + Decompressor decompressor = decompressor(); + decompressor.decompress(new ByteArrayDataInput(compressed), length, 0, length, outbytes); + + // get the uncompressed array out of outbytes + byte[] restored = new byte[outbytes.length]; + System.arraycopy(outbytes.bytes, 0, restored, 0, outbytes.length); + + assertArrayEquals(bytes, restored); + } + +} diff --git a/sandbox/modules/custom-codecs/src/test/java/org/opensearch/index/codec/customcodecs/QatDeflateTests.java b/sandbox/modules/custom-codecs/src/test/java/org/opensearch/index/codec/customcodecs/QatDeflateTests.java new file mode 100644 index 0000000000000..9dd2a7888c5c1 --- /dev/null +++ b/sandbox/modules/custom-codecs/src/test/java/org/opensearch/index/codec/customcodecs/QatDeflateTests.java @@ -0,0 +1,30 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.index.codec.customcodecs; + +import org.apache.lucene.codecs.compressing.Compressor; +import org.apache.lucene.codecs.compressing.Decompressor; + +/** + * Test QATDEFLATE compression + */ +public class QatDeflateTests extends AbstractCompressorTests { + + private final Compressor compressor = new QatDeflateMode("auto").newCompressor(); + private final Decompressor decompressor = new QatDeflateMode("auto").newDecompressor(); + + @Override + Compressor compressor() { + return compressor; + } + + @Override + Decompressor decompressor() { + return decompressor; + } +} diff --git a/sandbox/modules/custom-codecs/src/test/java/org/opensearch/index/codec/customcodecs/QatLz4Tests.java b/sandbox/modules/custom-codecs/src/test/java/org/opensearch/index/codec/customcodecs/QatLz4Tests.java new file mode 100644 index 0000000000000..6e2a31fdc75af --- /dev/null +++ b/sandbox/modules/custom-codecs/src/test/java/org/opensearch/index/codec/customcodecs/QatLz4Tests.java @@ -0,0 +1,30 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.index.codec.customcodecs; + +import org.apache.lucene.codecs.compressing.Compressor; +import org.apache.lucene.codecs.compressing.Decompressor; + +/** + * Test QATLZ4 compression (with no dictionary). + */ +public class QatLz4Tests extends AbstractCompressorTests { + + private final Compressor compressor = new QatLz4Mode("auto").newCompressor(); + private final Decompressor decompressor = new QatLz4Mode("auto").newDecompressor(); + + @Override + Compressor compressor() { + return compressor; + } + + @Override + Decompressor decompressor() { + return decompressor; + } +}