From 4df347c6f904942b073ee4bc76ec87a095cee4c7 Mon Sep 17 00:00:00 2001 From: Andrey Pleskach Date: Thu, 1 Jun 2023 16:40:10 +0200 Subject: [PATCH] Add ZSTD compression for snapshotting (#2996) Changes: - Added ZSTD compressor for snapshotting - 2 JSON repository settings: - readonly - compression were moved into the BlobStoreRepository class and removed from other repos classes where they were used. Signed-off-by: Andrey Pleskach --- CHANGELOG.md | 2 + buildSrc/version.properties | 3 + distribution/tools/plugin-cli/build.gradle | 3 - modules/transport-netty4/build.gradle | 1 - .../repositories/azure/AzureRepository.java | 8 +- .../gcs/GoogleCloudStorageRepository.java | 11 +- .../repositories/hdfs/HdfsRepository.java | 2 +- .../repositories/s3/S3Repository.java | 6 - plugins/transport-nio/build.gradle | 1 - sandbox/plugins/custom-codecs/build.gradle | 2 +- .../licenses/zstd-jni-1.5.5-1.jar.sha1 | 1 - server/build.gradle | 3 + server/licenses/zstd-jni-1.5.5-3.jar.sha1 | 1 + .../licenses/zstd-jni-LICENSE.txt | 0 .../licenses/zstd-jni-NOTICE.txt | 0 .../common/compress/CompressorFactory.java | 13 +- .../common/compress/CompressorType.java | 40 ++ .../common/compress/DeflateCompressor.java | 29 +- .../common/compress/NoneCompressor.java | 53 +++ .../common/compress/ZstdCompressor.java | 81 ++++ .../blobstore/BlobStoreRepository.java | 43 +- .../blobstore/ChecksumBlobStoreFormat.java | 12 +- .../repositories/fs/FsRepository.java | 1 - .../org/opensearch/bootstrap/security.policy | 6 + .../compress/AbstractCompressorTests.java | 408 ++++++++++++++++++ .../common/compress/DeflateCompressTests.java | 396 +---------------- .../common/compress/ZstdCompressTests.java | 22 + .../snapshots/BlobStoreFormatTests.java | 24 +- ...earchBlobStoreRepositoryIntegTestCase.java | 9 +- .../AbstractSnapshotIntegTestCase.java | 7 +- 30 files changed, 729 insertions(+), 459 deletions(-) delete mode 100644 sandbox/plugins/custom-codecs/licenses/zstd-jni-1.5.5-1.jar.sha1 create mode 100644 server/licenses/zstd-jni-1.5.5-3.jar.sha1 rename {sandbox/plugins/custom-codecs => server}/licenses/zstd-jni-LICENSE.txt (100%) rename {sandbox/plugins/custom-codecs => server}/licenses/zstd-jni-NOTICE.txt (100%) create mode 100644 server/src/main/java/org/opensearch/common/compress/CompressorType.java create mode 100644 server/src/main/java/org/opensearch/common/compress/NoneCompressor.java create mode 100644 server/src/main/java/org/opensearch/common/compress/ZstdCompressor.java create mode 100644 server/src/test/java/org/opensearch/common/compress/AbstractCompressorTests.java create mode 100644 server/src/test/java/org/opensearch/common/compress/ZstdCompressTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 79edf31b32740..93f21e2d8462f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,6 +41,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Bump `com.netflix.nebula:nebula-publishing-plugin` from 19.2.0 to 20.3.0 - Bump `com.diffplug.spotless` from 6.17.0 to 6.18.0 - Bump `io.opencensus:opencensus-api` from 0.18.0 to 0.31.1 ([#7291](https://github.com/opensearch-project/OpenSearch/pull/7291)) +- Add `com.github.luben:zstd-jni` version 1.5.5-3 ([#2996](https://github.com/opensearch-project/OpenSearch/pull/2996)) ### Changed - [CCR] Add getHistoryOperationsFromTranslog method to fetch the history snapshot from translogs ([#3948](https://github.com/opensearch-project/OpenSearch/pull/3948)) @@ -50,6 +51,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Change http code for DecommissioningFailedException from 500 to 400 ([#5283](https://github.com/opensearch-project/OpenSearch/pull/5283)) - Improve summary error message for invalid setting updates ([#4792](https://github.com/opensearch-project/OpenSearch/pull/4792)) - Reduce memory copy in zstd compression ([#7681](https://github.com/opensearch-project/OpenSearch/pull/7681)) +- Add ZSTD compression for snapshotting ([#2996](https://github.com/opensearch-project/OpenSearch/pull/2996)) ### Deprecated diff --git a/buildSrc/version.properties b/buildSrc/version.properties index c4810d8b048d8..5130f138c9132 100644 --- a/buildSrc/version.properties +++ b/buildSrc/version.properties @@ -57,3 +57,6 @@ bytebuddy = 1.14.3 # benchmark dependencies jmh = 1.35 + +# compression +zstd = 1.5.5-3 diff --git a/distribution/tools/plugin-cli/build.gradle b/distribution/tools/plugin-cli/build.gradle index 174c5cc76dfb7..894e4fc67e019 100644 --- a/distribution/tools/plugin-cli/build.gradle +++ b/distribution/tools/plugin-cli/build.gradle @@ -82,9 +82,6 @@ thirdPartyAudit.ignoreViolations( ) thirdPartyAudit.ignoreMissingClasses( - 'com.github.luben.zstd.BufferPool', - 'com.github.luben.zstd.ZstdInputStream', - 'com.github.luben.zstd.ZstdOutputStream', 'org.brotli.dec.BrotliInputStream', 'org.objectweb.asm.AnnotationVisitor', 'org.objectweb.asm.Attribute', diff --git a/modules/transport-netty4/build.gradle b/modules/transport-netty4/build.gradle index ba3ea044139a8..a4cfef14a2300 100644 --- a/modules/transport-netty4/build.gradle +++ b/modules/transport-netty4/build.gradle @@ -185,7 +185,6 @@ thirdPartyAudit { 'org.slf4j.LoggerFactory', 'org.slf4j.spi.LocationAwareLogger', - 'com.github.luben.zstd.Zstd', 'com.google.protobuf.nano.CodedOutputByteBufferNano', 'com.google.protobuf.nano.MessageNano', 'com.jcraft.jzlib.Deflater', diff --git a/plugins/repository-azure/src/main/java/org/opensearch/repositories/azure/AzureRepository.java b/plugins/repository-azure/src/main/java/org/opensearch/repositories/azure/AzureRepository.java index e3dbca1a02e93..2677604ecb622 100644 --- a/plugins/repository-azure/src/main/java/org/opensearch/repositories/azure/AzureRepository.java +++ b/plugins/repository-azure/src/main/java/org/opensearch/repositories/azure/AzureRepository.java @@ -100,8 +100,6 @@ public static final class Repository { MAX_CHUNK_SIZE, Property.NodeScope ); - public static final Setting COMPRESS_SETTING = Setting.boolSetting("compress", false, Property.NodeScope); - public static final Setting READONLY_SETTING = Setting.boolSetting("readonly", false, Property.NodeScope); } private final BlobPath basePath; @@ -118,7 +116,7 @@ public AzureRepository( ) { super( metadata, - Repository.COMPRESS_SETTING.get(metadata.settings()), + COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry, clusterService, recoverySettings, @@ -142,8 +140,8 @@ public AzureRepository( // If the user explicitly did not define a readonly value, we set it by ourselves depending on the location mode setting. // For secondary_only setting, the repository should be read only final LocationMode locationMode = Repository.LOCATION_MODE_SETTING.get(metadata.settings()); - if (Repository.READONLY_SETTING.exists(metadata.settings())) { - this.readonly = Repository.READONLY_SETTING.get(metadata.settings()); + if (READONLY_SETTING.exists(metadata.settings())) { + this.readonly = READONLY_SETTING.get(metadata.settings()); } else { this.readonly = locationMode == LocationMode.SECONDARY_ONLY; } diff --git a/plugins/repository-gcs/src/main/java/org/opensearch/repositories/gcs/GoogleCloudStorageRepository.java b/plugins/repository-gcs/src/main/java/org/opensearch/repositories/gcs/GoogleCloudStorageRepository.java index 0ff5527881545..a743ac72bdb8b 100644 --- a/plugins/repository-gcs/src/main/java/org/opensearch/repositories/gcs/GoogleCloudStorageRepository.java +++ b/plugins/repository-gcs/src/main/java/org/opensearch/repositories/gcs/GoogleCloudStorageRepository.java @@ -50,7 +50,6 @@ import java.util.function.Function; import static org.opensearch.common.settings.Setting.Property; -import static org.opensearch.common.settings.Setting.boolSetting; import static org.opensearch.common.settings.Setting.byteSizeSetting; import static org.opensearch.common.settings.Setting.simpleString; @@ -70,7 +69,6 @@ class GoogleCloudStorageRepository extends MeteredBlobStoreRepository { static final Setting BUCKET = simpleString("bucket", Property.NodeScope, Property.Dynamic); static final Setting BASE_PATH = simpleString("base_path", Property.NodeScope, Property.Dynamic); - static final Setting COMPRESS = boolSetting("compress", false, Property.NodeScope, Property.Dynamic); static final Setting CHUNK_SIZE = byteSizeSetting( "chunk_size", MAX_CHUNK_SIZE, @@ -94,7 +92,14 @@ class GoogleCloudStorageRepository extends MeteredBlobStoreRepository { final ClusterService clusterService, final RecoverySettings recoverySettings ) { - super(metadata, getSetting(COMPRESS, metadata), namedXContentRegistry, clusterService, recoverySettings, buildLocation(metadata)); + super( + metadata, + getSetting(COMPRESS_SETTING, metadata), + namedXContentRegistry, + clusterService, + recoverySettings, + buildLocation(metadata) + ); this.storageService = storageService; String basePath = BASE_PATH.get(metadata.settings()); diff --git a/plugins/repository-hdfs/src/main/java/org/opensearch/repositories/hdfs/HdfsRepository.java b/plugins/repository-hdfs/src/main/java/org/opensearch/repositories/hdfs/HdfsRepository.java index b4075620d0acb..88c58942e9bbf 100644 --- a/plugins/repository-hdfs/src/main/java/org/opensearch/repositories/hdfs/HdfsRepository.java +++ b/plugins/repository-hdfs/src/main/java/org/opensearch/repositories/hdfs/HdfsRepository.java @@ -83,7 +83,7 @@ public HdfsRepository( final ClusterService clusterService, final RecoverySettings recoverySettings ) { - super(metadata, metadata.settings().getAsBoolean("compress", false), namedXContentRegistry, clusterService, recoverySettings); + super(metadata, COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry, clusterService, recoverySettings); this.environment = environment; this.chunkSize = metadata.settings().getAsBytesSize("chunk_size", null); diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java index 117b4f009ec03..954b79035429f 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java @@ -156,12 +156,6 @@ class S3Repository extends MeteredBlobStoreRepository { new ByteSizeValue(5, ByteSizeUnit.TB) ); - /** - * When set to true metadata files are stored in compressed format. This setting doesn’t affect index - * files that are already compressed by default. Defaults to false. - */ - static final Setting COMPRESS_SETTING = Setting.boolSetting("compress", false); - /** * Sets the S3 storage class type for the backup files. Values may be standard, reduced_redundancy, * standard_ia, onezone_ia and intelligent_tiering. Defaults to standard. diff --git a/plugins/transport-nio/build.gradle b/plugins/transport-nio/build.gradle index 2e58cf3b87b3e..87e54bfed3b3d 100644 --- a/plugins/transport-nio/build.gradle +++ b/plugins/transport-nio/build.gradle @@ -111,7 +111,6 @@ thirdPartyAudit { 'org.slf4j.LoggerFactory', 'org.slf4j.spi.LocationAwareLogger', - 'com.github.luben.zstd.Zstd', 'com.google.protobuf.nano.CodedOutputByteBufferNano', 'com.google.protobuf.nano.MessageNano', 'com.jcraft.jzlib.Deflater', diff --git a/sandbox/plugins/custom-codecs/build.gradle b/sandbox/plugins/custom-codecs/build.gradle index 43b134c30da0f..2183df25044a4 100644 --- a/sandbox/plugins/custom-codecs/build.gradle +++ b/sandbox/plugins/custom-codecs/build.gradle @@ -21,7 +21,7 @@ opensearchplugin { } dependencies { - api "com.github.luben:zstd-jni:1.5.5-1" + api "com.github.luben:zstd-jni:${versions.zstd}" } yamlRestTest.enabled = false; diff --git a/sandbox/plugins/custom-codecs/licenses/zstd-jni-1.5.5-1.jar.sha1 b/sandbox/plugins/custom-codecs/licenses/zstd-jni-1.5.5-1.jar.sha1 deleted file mode 100644 index bfb9e565bc6d5..0000000000000 --- a/sandbox/plugins/custom-codecs/licenses/zstd-jni-1.5.5-1.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -fda1d6278299af27484e1cc3c79a060e41b7ef7e \ No newline at end of file diff --git a/server/build.gradle b/server/build.gradle index 4025a6657187e..7e75368323cf2 100644 --- a/server/build.gradle +++ b/server/build.gradle @@ -150,6 +150,9 @@ dependencies { api "com.google.protobuf:protobuf-java:${versions.protobuf}" api "jakarta.annotation:jakarta.annotation-api:${versions.jakarta_annotation}" + //zstd + api "com.github.luben:zstd-jni:${versions.zstd}" + testImplementation(project(":test:framework")) { // tests use the locally compiled version of server exclude group: 'org.opensearch', module: 'server' diff --git a/server/licenses/zstd-jni-1.5.5-3.jar.sha1 b/server/licenses/zstd-jni-1.5.5-3.jar.sha1 new file mode 100644 index 0000000000000..6d30ba7e2de80 --- /dev/null +++ b/server/licenses/zstd-jni-1.5.5-3.jar.sha1 @@ -0,0 +1 @@ +488dd9b15c9e8cf87d857f65f5cd6359c2853381 \ No newline at end of file diff --git a/sandbox/plugins/custom-codecs/licenses/zstd-jni-LICENSE.txt b/server/licenses/zstd-jni-LICENSE.txt similarity index 100% rename from sandbox/plugins/custom-codecs/licenses/zstd-jni-LICENSE.txt rename to server/licenses/zstd-jni-LICENSE.txt diff --git a/sandbox/plugins/custom-codecs/licenses/zstd-jni-NOTICE.txt b/server/licenses/zstd-jni-NOTICE.txt similarity index 100% rename from sandbox/plugins/custom-codecs/licenses/zstd-jni-NOTICE.txt rename to server/licenses/zstd-jni-NOTICE.txt diff --git a/server/src/main/java/org/opensearch/common/compress/CompressorFactory.java b/server/src/main/java/org/opensearch/common/compress/CompressorFactory.java index cb30f11fa4157..1d8f2dcec709a 100644 --- a/server/src/main/java/org/opensearch/common/compress/CompressorFactory.java +++ b/server/src/main/java/org/opensearch/common/compress/CompressorFactory.java @@ -47,7 +47,14 @@ */ public class CompressorFactory { - public static final Compressor COMPRESSOR = new DeflateCompressor(); + public static final Compressor DEFLATE_COMPRESSOR = new DeflateCompressor(); + + @Deprecated + public static final Compressor COMPRESSOR = DEFLATE_COMPRESSOR; + + public static final Compressor ZSTD_COMPRESSOR = new ZstdCompressor(); + + public static final Compressor NONE_COMPRESSOR = new NoneCompressor(); public static boolean isCompressed(BytesReference bytes) { return compressor(bytes) != null; @@ -61,6 +68,9 @@ public static Compressor compressor(BytesReference bytes) { // as a xcontent, we have a problem assert XContentHelper.xContentType(bytes) == null; return COMPRESSOR; + } else if (ZSTD_COMPRESSOR.isCompressed(bytes)) { + assert XContentHelper.xContentType(bytes) == null; + return ZSTD_COMPRESSOR; } XContentType contentType = XContentHelper.xContentType(bytes); @@ -81,7 +91,6 @@ private static boolean isAncient(BytesReference bytes) { /** * Uncompress the provided data, data can be detected as compressed using {@link #isCompressed(BytesReference)}. - * @throws NullPointerException a NullPointerException will be thrown when bytes is null */ public static BytesReference uncompressIfNeeded(BytesReference bytes) throws IOException { Compressor compressor = compressor(Objects.requireNonNull(bytes, "the BytesReference must not be null")); diff --git a/server/src/main/java/org/opensearch/common/compress/CompressorType.java b/server/src/main/java/org/opensearch/common/compress/CompressorType.java new file mode 100644 index 0000000000000..65453cd51848e --- /dev/null +++ b/server/src/main/java/org/opensearch/common/compress/CompressorType.java @@ -0,0 +1,40 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.compress; + +/** + * Supported compression types + * + * @opensearch.internal + */ +public enum CompressorType { + + DEFLATE { + @Override + public Compressor compressor() { + return CompressorFactory.DEFLATE_COMPRESSOR; + } + }, + + ZSTD { + @Override + public Compressor compressor() { + return CompressorFactory.ZSTD_COMPRESSOR; + } + }, + + NONE { + @Override + public Compressor compressor() { + return CompressorFactory.NONE_COMPRESSOR; + } + }; + + public abstract Compressor compressor(); +} diff --git a/server/src/main/java/org/opensearch/common/compress/DeflateCompressor.java b/server/src/main/java/org/opensearch/common/compress/DeflateCompressor.java index 36ee41e2d723d..1ab74b46a0c3d 100644 --- a/server/src/main/java/org/opensearch/common/compress/DeflateCompressor.java +++ b/server/src/main/java/org/opensearch/common/compress/DeflateCompressor.java @@ -157,16 +157,9 @@ public InputStream threadLocalInputStream(InputStream in) throws IOException { * @return decompressing stream */ public static InputStream inputStream(InputStream in, boolean threadLocal) throws IOException { - final byte[] headerBytes = new byte[HEADER.length]; - int len = 0; - while (len < headerBytes.length) { - final int read = in.read(headerBytes, len, headerBytes.length - len); - if (read == -1) { - break; - } - len += read; - } - if (len != HEADER.length || Arrays.equals(headerBytes, HEADER) == false) { + final byte[] header = in.readNBytes(HEADER.length); + + if (Arrays.equals(header, HEADER) == false) { throw new IllegalArgumentException("Input stream is not compressed with DEFLATE!"); } @@ -252,9 +245,11 @@ public BytesReference uncompress(BytesReference bytesReference) throws IOExcepti } finally { inflater.reset(); } - final BytesReference res = buffer.copyBytes(); - buffer.reset(); - return res; + try { + return buffer.copyBytes(); + } finally { + buffer.reset(); + } } // Reusable Deflater reference. Note: This is a separate instance from the one used for the compressing stream wrapper because we @@ -271,8 +266,10 @@ public BytesReference compress(BytesReference bytesReference) throws IOException } finally { deflater.reset(); } - final BytesReference res = buffer.copyBytes(); - buffer.reset(); - return res; + try { + return buffer.copyBytes(); + } finally { + buffer.reset(); + } } } diff --git a/server/src/main/java/org/opensearch/common/compress/NoneCompressor.java b/server/src/main/java/org/opensearch/common/compress/NoneCompressor.java new file mode 100644 index 0000000000000..d64645f689c67 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/compress/NoneCompressor.java @@ -0,0 +1,53 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.compress; + +import org.opensearch.common.bytes.BytesReference; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +/** + * {@link Compressor} no compressor implementation. + * + * @opensearch.internal + */ +public class NoneCompressor implements Compressor { + @Override + public boolean isCompressed(BytesReference bytes) { + return false; + } + + @Override + public int headerLength() { + return 0; + } + + @Override + public InputStream threadLocalInputStream(InputStream in) throws IOException { + return in; + } + + @Override + public OutputStream threadLocalOutputStream(OutputStream out) throws IOException { + return out; + } + + @Override + public BytesReference uncompress(BytesReference bytesReference) throws IOException { + return bytesReference; + } + + @Override + public BytesReference compress(BytesReference bytesReference) throws IOException { + return bytesReference; + } + +} diff --git a/server/src/main/java/org/opensearch/common/compress/ZstdCompressor.java b/server/src/main/java/org/opensearch/common/compress/ZstdCompressor.java new file mode 100644 index 0000000000000..26b4d7e8ab569 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/compress/ZstdCompressor.java @@ -0,0 +1,81 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.compress; + +import com.github.luben.zstd.RecyclingBufferPool; +import com.github.luben.zstd.ZstdInputStreamNoFinalizer; +import com.github.luben.zstd.ZstdOutputStreamNoFinalizer; +import org.opensearch.common.bytes.BytesReference; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; + +/** + * {@link Compressor} implementation based on the ZSTD compression algorithm. + * + * @opensearch.internal + */ +public class ZstdCompressor implements Compressor { + // An arbitrary header that we use to identify compressed streams + // It needs to be different from other compressors and to not be specific + // enough so that no stream starting with these bytes could be detected as + // a XContent + private static final byte[] HEADER = new byte[] { 'Z', 'S', 'T', 'D', '\0' }; + + private static final int LEVEL = 3; + + private static final int BUFFER_SIZE = 4096; + + @Override + public boolean isCompressed(BytesReference bytes) { + if (bytes.length() < HEADER.length) { + return false; + } + for (int i = 0; i < HEADER.length; ++i) { + if (bytes.get(i) != HEADER[i]) { + return false; + } + } + return true; + } + + @Override + public int headerLength() { + return HEADER.length; + } + + @Override + public InputStream threadLocalInputStream(InputStream in) throws IOException { + final byte[] header = in.readNBytes(HEADER.length); + if (Arrays.equals(header, HEADER) == false) { + throw new IllegalArgumentException("Input stream is not compressed with ZSTD!"); + } + return new ZstdInputStreamNoFinalizer(new BufferedInputStream(in, BUFFER_SIZE), RecyclingBufferPool.INSTANCE); + } + + @Override + public OutputStream threadLocalOutputStream(OutputStream out) throws IOException { + out.write(HEADER); + return new ZstdOutputStreamNoFinalizer(new BufferedOutputStream(out, BUFFER_SIZE), RecyclingBufferPool.INSTANCE, LEVEL); + } + + @Override + public BytesReference uncompress(BytesReference bytesReference) throws IOException { + throw new UnsupportedOperationException("ZSTD compression is supported only for snapshotting"); + } + + @Override + public BytesReference compress(BytesReference bytesReference) throws IOException { + throw new UnsupportedOperationException("ZSTD compression is supported only for snapshotting"); + } +} diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index d90c4f8c964d0..59694bb5b478f 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -79,7 +79,9 @@ import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.collect.Tuple; import org.opensearch.common.component.AbstractLifecycleComponent; +import org.opensearch.common.compress.Compressor; import org.opensearch.common.compress.CompressorFactory; +import org.opensearch.common.compress.CompressorType; import org.opensearch.common.compress.NotXContentException; import org.opensearch.common.io.Streams; import org.opensearch.common.lease.Releasable; @@ -140,6 +142,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -248,17 +251,31 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp Setting.Property.NodeScope ); + public static final Setting COMPRESS_SETTING = Setting.boolSetting("compress", false, Setting.Property.NodeScope); + + public static final Setting COMPRESSION_TYPE_SETTING = new Setting<>( + "compression_type", + CompressorType.DEFLATE.name().toLowerCase(Locale.ROOT), + s -> CompressorType.valueOf(s.toUpperCase(Locale.ROOT)), + Setting.Property.NodeScope + ); + /** * Setting to disable writing the {@code index.latest} blob which enables the contents of this repository to be used with a * url-repository. */ public static final Setting SUPPORT_URL_REPO = Setting.boolSetting("support_url_repo", true, Setting.Property.NodeScope); + /*** + * Setting to set repository as readonly + */ + public static final Setting READONLY_SETTING = Setting.boolSetting("readonly", false, Setting.Property.NodeScope); + protected final boolean supportURLRepo; private final int maxShardBlobDeleteBatch; - private final boolean compress; + private final Compressor compressor; private final boolean cacheRepositoryData; @@ -358,7 +375,6 @@ protected BlobStoreRepository( final ClusterService clusterService, final RecoverySettings recoverySettings ) { - this.compress = compress; this.metadata = metadata; this.namedXContentRegistry = namedXContentRegistry; this.threadPool = clusterService.getClusterApplierService().threadPool(); @@ -367,10 +383,11 @@ protected BlobStoreRepository( this.supportURLRepo = SUPPORT_URL_REPO.get(metadata.settings()); snapshotRateLimiter = getRateLimiter(metadata.settings(), "max_snapshot_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB)); restoreRateLimiter = getRateLimiter(metadata.settings(), "max_restore_bytes_per_sec", ByteSizeValue.ZERO); - readOnly = metadata.settings().getAsBoolean("readonly", false); + readOnly = READONLY_SETTING.get(metadata.settings()); cacheRepositoryData = CACHE_REPOSITORY_DATA.get(metadata.settings()); bufferSize = Math.toIntExact(BUFFER_SIZE_SETTING.get(metadata.settings()).getBytes()); maxShardBlobDeleteBatch = MAX_SNAPSHOT_SHARD_BLOB_DELETE_BATCH_SIZE.get(metadata.settings()); + this.compressor = compress ? COMPRESSION_TYPE_SETTING.get(metadata.settings()).compressor() : CompressorFactory.NONE_COMPRESSOR; } @Override @@ -539,13 +556,13 @@ public void cloneShardSnapshot( sourceMeta.asClone(target.getName(), startTime, threadPool.absoluteTimeInMillis() - startTime), shardContainer, target.getUUID(), - compress + compressor ); INDEX_SHARD_SNAPSHOTS_FORMAT.write( existingSnapshots.withClone(source.getName(), target.getName()), shardContainer, newGen, - compress + compressor ); return newGen; })); @@ -685,7 +702,7 @@ public BlobStore blobStore() { * @return true if compression is needed */ protected final boolean isCompress() { - return compress; + return compressor != CompressorFactory.NONE_COMPRESSOR; } /** @@ -1391,7 +1408,7 @@ public void finalizeSnapshot( executor.execute( ActionRunnable.run( allMetaListener, - () -> GLOBAL_METADATA_FORMAT.write(clusterMetadata, blobContainer(), snapshotId.getUUID(), compress) + () -> GLOBAL_METADATA_FORMAT.write(clusterMetadata, blobContainer(), snapshotId.getUUID(), compressor) ) ); @@ -1404,7 +1421,7 @@ public void finalizeSnapshot( if (metaUUID == null) { // We don't yet have this version of the metadata so we write it metaUUID = UUIDs.base64UUID(); - INDEX_METADATA_FORMAT.write(indexMetaData, indexContainer(index), metaUUID, compress); + INDEX_METADATA_FORMAT.write(indexMetaData, indexContainer(index), metaUUID, compressor); indexMetaIdentifiers.put(identifiers, metaUUID); } indexMetas.put(index, identifiers); @@ -1413,7 +1430,7 @@ public void finalizeSnapshot( executor.execute( ActionRunnable.run( allMetaListener, - () -> SNAPSHOT_FORMAT.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), compress) + () -> SNAPSHOT_FORMAT.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), compressor) ) ); }, onUpdateFailure); @@ -2423,7 +2440,7 @@ public void snapshotShard( // reference a generation that has not had all its files fully upload. indexGeneration = UUIDs.randomBase64UUID(); try { - INDEX_SHARD_SNAPSHOTS_FORMAT.write(updatedBlobStoreIndexShardSnapshots, shardContainer, indexGeneration, compress); + INDEX_SHARD_SNAPSHOTS_FORMAT.write(updatedBlobStoreIndexShardSnapshots, shardContainer, indexGeneration, compressor); } catch (IOException e) { throw new IndexShardSnapshotFailedException( shardId, @@ -2454,7 +2471,7 @@ public void snapshotShard( ), shardContainer, snapshotId.getUUID(), - compress + compressor ); } catch (IOException e) { throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e); @@ -2789,7 +2806,7 @@ private ShardSnapshotMetaDeleteResult deleteFromShardSnapshotMeta( final BlobStoreIndexShardSnapshots updatedSnapshots = new BlobStoreIndexShardSnapshots(newSnapshotsList); if (indexGeneration < 0L) { writtenGeneration = UUIDs.randomBase64UUID(); - INDEX_SHARD_SNAPSHOTS_FORMAT.write(updatedSnapshots, shardContainer, writtenGeneration, compress); + INDEX_SHARD_SNAPSHOTS_FORMAT.write(updatedSnapshots, shardContainer, writtenGeneration, compressor); } else { writtenGeneration = String.valueOf(indexGeneration); writeShardIndexBlobAtomic(shardContainer, indexGeneration, updatedSnapshots); @@ -2829,7 +2846,7 @@ private void writeShardIndexBlobAtomic( () -> new ParameterizedMessage("[{}] Writing shard index [{}] to [{}]", metadata.name(), indexGeneration, shardContainer.path()) ); final String blobName = INDEX_SHARD_SNAPSHOTS_FORMAT.blobName(String.valueOf(indexGeneration)); - writeAtomic(shardContainer, blobName, INDEX_SHARD_SNAPSHOTS_FORMAT.serialize(updatedSnapshots, blobName, compress), true); + writeAtomic(shardContainer, blobName, INDEX_SHARD_SNAPSHOTS_FORMAT.serialize(updatedSnapshots, blobName, compressor), true); } // Unused blobs are all previous index-, data- and meta-blobs and that are not referenced by the new index- as well as all diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java index 30bcc566729e2..b0dba60e2188f 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java @@ -44,7 +44,7 @@ import org.opensearch.common.CheckedFunction; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.bytes.BytesReference; -import org.opensearch.common.compress.CompressorFactory; +import org.opensearch.common.compress.Compressor; import org.opensearch.common.io.Streams; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lucene.store.ByteArrayIndexInput; @@ -159,15 +159,15 @@ public T deserialize(String blobName, NamedXContentRegistry namedXContentRegistr * @param obj object to be serialized * @param blobContainer blob container * @param name blob name - * @param compress whether to use compression + * @param compressor whether to use compression */ - public void write(T obj, BlobContainer blobContainer, String name, boolean compress) throws IOException { + public void write(final T obj, final BlobContainer blobContainer, final String name, final Compressor compressor) throws IOException { final String blobName = blobName(name); - final BytesReference bytes = serialize(obj, blobName, compress); + final BytesReference bytes = serialize(obj, blobName, compressor); blobContainer.writeBlob(blobName, bytes.streamInput(), bytes.length(), false); } - public BytesReference serialize(final T obj, final String blobName, final boolean compress) throws IOException { + public BytesReference serialize(final T obj, final String blobName, final Compressor compressor) throws IOException { try (BytesStreamOutput outputStream = new BytesStreamOutput()) { try ( OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput( @@ -187,7 +187,7 @@ public void close() throws IOException { }; XContentBuilder builder = XContentFactory.contentBuilder( XContentType.SMILE, - compress ? CompressorFactory.COMPRESSOR.threadLocalOutputStream(indexOutputOutputStream) : indexOutputOutputStream + compressor.threadLocalOutputStream(indexOutputOutputStream) ) ) { builder.startObject(); diff --git a/server/src/main/java/org/opensearch/repositories/fs/FsRepository.java b/server/src/main/java/org/opensearch/repositories/fs/FsRepository.java index 6f18cdab804e9..0b9989ff64d9c 100644 --- a/server/src/main/java/org/opensearch/repositories/fs/FsRepository.java +++ b/server/src/main/java/org/opensearch/repositories/fs/FsRepository.java @@ -92,7 +92,6 @@ public class FsRepository extends BlobStoreRepository { new ByteSizeValue(Long.MAX_VALUE), Property.NodeScope ); - public static final Setting COMPRESS_SETTING = Setting.boolSetting("compress", false, Property.NodeScope); public static final Setting REPOSITORIES_COMPRESS_SETTING = Setting.boolSetting( "repositories.fs.compress", false, diff --git a/server/src/main/resources/org/opensearch/bootstrap/security.policy b/server/src/main/resources/org/opensearch/bootstrap/security.policy index 3671782b9d12f..77cd0ab05278e 100644 --- a/server/src/main/resources/org/opensearch/bootstrap/security.policy +++ b/server/src/main/resources/org/opensearch/bootstrap/security.policy @@ -79,6 +79,12 @@ grant codeBase "${codebase.jna}" { permission java.lang.RuntimePermission "accessDeclaredMembers"; }; + +// ZSTD compression +grant codeBase "${codebase.zstd-jni}" { + permission java.lang.RuntimePermission "loadLibrary.*"; +}; + //// Everything else: grant { diff --git a/server/src/test/java/org/opensearch/common/compress/AbstractCompressorTests.java b/server/src/test/java/org/opensearch/common/compress/AbstractCompressorTests.java new file mode 100644 index 0000000000000..67ab4e240a4ef --- /dev/null +++ b/server/src/test/java/org/opensearch/common/compress/AbstractCompressorTests.java @@ -0,0 +1,408 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.compress; + +import org.apache.lucene.tests.util.LineFileDocs; +import org.apache.lucene.tests.util.TestUtil; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.Random; +import java.util.concurrent.CountDownLatch; + +abstract class AbstractCompressorTests extends OpenSearchTestCase { + + public void testRandom() throws IOException { + Random r = random(); + for (int i = 0; i < 10; i++) { + final byte[] bytes = new byte[TestUtil.nextInt(r, 1, 100000)]; + r.nextBytes(bytes); + doTest(bytes); + } + } + + public void testRandomThreads() throws Exception { + final Random r = random(); + int threadCount = TestUtil.nextInt(r, 2, 6); + Thread[] threads = new Thread[threadCount]; + final CountDownLatch startingGun = new CountDownLatch(1); + for (int tid = 0; tid < threadCount; tid++) { + final long seed = r.nextLong(); + threads[tid] = new Thread() { + @Override + public void run() { + try { + Random r = new Random(seed); + startingGun.await(); + for (int i = 0; i < 10; i++) { + byte bytes[] = new byte[TestUtil.nextInt(r, 1, 100000)]; + r.nextBytes(bytes); + doTest(bytes); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + threads[tid].start(); + } + startingGun.countDown(); + for (Thread t : threads) { + t.join(); + } + } + + public void testLineDocs() throws IOException { + Random r = random(); + LineFileDocs lineFileDocs = new LineFileDocs(r); + for (int i = 0; i < 10; i++) { + int numDocs = TestUtil.nextInt(r, 1, 200); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + for (int j = 0; j < numDocs; j++) { + String s = lineFileDocs.nextDoc().get("body"); + bos.write(s.getBytes(StandardCharsets.UTF_8)); + } + doTest(bos.toByteArray()); + } + lineFileDocs.close(); + } + + public void testLineDocsThreads() throws Exception { + final Random r = random(); + int threadCount = TestUtil.nextInt(r, 2, 6); + Thread[] threads = new Thread[threadCount]; + final CountDownLatch startingGun = new CountDownLatch(1); + for (int tid = 0; tid < threadCount; tid++) { + final long seed = r.nextLong(); + threads[tid] = new Thread() { + @Override + public void run() { + try { + Random r = new Random(seed); + startingGun.await(); + LineFileDocs lineFileDocs = new LineFileDocs(r); + for (int i = 0; i < 10; i++) { + int numDocs = TestUtil.nextInt(r, 1, 200); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + for (int j = 0; j < numDocs; j++) { + String s = lineFileDocs.nextDoc().get("body"); + bos.write(s.getBytes(StandardCharsets.UTF_8)); + } + doTest(bos.toByteArray()); + } + lineFileDocs.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + threads[tid].start(); + } + startingGun.countDown(); + for (Thread t : threads) { + t.join(); + } + } + + public void testRepetitionsL() throws IOException { + Random r = random(); + for (int i = 0; i < 10; i++) { + int numLongs = TestUtil.nextInt(r, 1, 10000); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + long theValue = r.nextLong(); + for (int j = 0; j < numLongs; j++) { + if (r.nextInt(10) == 0) { + theValue = r.nextLong(); + } + bos.write((byte) (theValue >>> 56)); + bos.write((byte) (theValue >>> 48)); + bos.write((byte) (theValue >>> 40)); + bos.write((byte) (theValue >>> 32)); + bos.write((byte) (theValue >>> 24)); + bos.write((byte) (theValue >>> 16)); + bos.write((byte) (theValue >>> 8)); + bos.write((byte) theValue); + } + doTest(bos.toByteArray()); + } + } + + public void testRepetitionsLThreads() throws Exception { + final Random r = random(); + int threadCount = TestUtil.nextInt(r, 2, 6); + Thread[] threads = new Thread[threadCount]; + final CountDownLatch startingGun = new CountDownLatch(1); + for (int tid = 0; tid < threadCount; tid++) { + final long seed = r.nextLong(); + threads[tid] = new Thread() { + @Override + public void run() { + try { + Random r = new Random(seed); + startingGun.await(); + for (int i = 0; i < 10; i++) { + int numLongs = TestUtil.nextInt(r, 1, 10000); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + long theValue = r.nextLong(); + for (int j = 0; j < numLongs; j++) { + if (r.nextInt(10) == 0) { + theValue = r.nextLong(); + } + bos.write((byte) (theValue >>> 56)); + bos.write((byte) (theValue >>> 48)); + bos.write((byte) (theValue >>> 40)); + bos.write((byte) (theValue >>> 32)); + bos.write((byte) (theValue >>> 24)); + bos.write((byte) (theValue >>> 16)); + bos.write((byte) (theValue >>> 8)); + bos.write((byte) theValue); + } + doTest(bos.toByteArray()); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + threads[tid].start(); + } + startingGun.countDown(); + for (Thread t : threads) { + t.join(); + } + } + + public void testRepetitionsI() throws IOException { + Random r = random(); + for (int i = 0; i < 10; i++) { + int numInts = TestUtil.nextInt(r, 1, 20000); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + int theValue = r.nextInt(); + for (int j = 0; j < numInts; j++) { + if (r.nextInt(10) == 0) { + theValue = r.nextInt(); + } + bos.write((byte) (theValue >>> 24)); + bos.write((byte) (theValue >>> 16)); + bos.write((byte) (theValue >>> 8)); + bos.write((byte) theValue); + } + doTest(bos.toByteArray()); + } + } + + public void testRepetitionsIThreads() throws Exception { + final Random r = random(); + int threadCount = TestUtil.nextInt(r, 2, 6); + Thread[] threads = new Thread[threadCount]; + final CountDownLatch startingGun = new CountDownLatch(1); + for (int tid = 0; tid < threadCount; tid++) { + final long seed = r.nextLong(); + threads[tid] = new Thread() { + @Override + public void run() { + try { + Random r = new Random(seed); + startingGun.await(); + for (int i = 0; i < 10; i++) { + int numInts = TestUtil.nextInt(r, 1, 20000); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + int theValue = r.nextInt(); + for (int j = 0; j < numInts; j++) { + if (r.nextInt(10) == 0) { + theValue = r.nextInt(); + } + bos.write((byte) (theValue >>> 24)); + bos.write((byte) (theValue >>> 16)); + bos.write((byte) (theValue >>> 8)); + bos.write((byte) theValue); + } + doTest(bos.toByteArray()); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + threads[tid].start(); + } + startingGun.countDown(); + for (Thread t : threads) { + t.join(); + } + } + + public void testRepetitionsS() throws IOException { + Random r = random(); + for (int i = 0; i < 10; i++) { + int numShorts = TestUtil.nextInt(r, 1, 40000); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + short theValue = (short) r.nextInt(65535); + for (int j = 0; j < numShorts; j++) { + if (r.nextInt(10) == 0) { + theValue = (short) r.nextInt(65535); + } + bos.write((byte) (theValue >>> 8)); + bos.write((byte) theValue); + } + doTest(bos.toByteArray()); + } + } + + public void testMixed() throws IOException { + Random r = random(); + LineFileDocs lineFileDocs = new LineFileDocs(r); + for (int i = 0; i < 2; ++i) { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + int prevInt = r.nextInt(); + long prevLong = r.nextLong(); + while (bos.size() < 400000) { + switch (r.nextInt(4)) { + case 0: + addInt(r, prevInt, bos); + break; + case 1: + addLong(r, prevLong, bos); + break; + case 2: + addString(lineFileDocs, bos); + break; + case 3: + addBytes(r, bos); + break; + default: + throw new IllegalStateException("Random is broken"); + } + } + doTest(bos.toByteArray()); + } + } + + private void addLong(Random r, long prev, ByteArrayOutputStream bos) { + long theValue = prev; + if (r.nextInt(10) != 0) { + theValue = r.nextLong(); + } + bos.write((byte) (theValue >>> 56)); + bos.write((byte) (theValue >>> 48)); + bos.write((byte) (theValue >>> 40)); + bos.write((byte) (theValue >>> 32)); + bos.write((byte) (theValue >>> 24)); + bos.write((byte) (theValue >>> 16)); + bos.write((byte) (theValue >>> 8)); + bos.write((byte) theValue); + } + + private void addInt(Random r, int prev, ByteArrayOutputStream bos) { + int theValue = prev; + if (r.nextInt(10) != 0) { + theValue = r.nextInt(); + } + bos.write((byte) (theValue >>> 24)); + bos.write((byte) (theValue >>> 16)); + bos.write((byte) (theValue >>> 8)); + bos.write((byte) theValue); + } + + private void addString(LineFileDocs lineFileDocs, ByteArrayOutputStream bos) throws IOException { + String s = lineFileDocs.nextDoc().get("body"); + bos.write(s.getBytes(StandardCharsets.UTF_8)); + } + + private void addBytes(Random r, ByteArrayOutputStream bos) throws IOException { + byte bytes[] = new byte[TestUtil.nextInt(r, 1, 10000)]; + r.nextBytes(bytes); + bos.write(bytes); + } + + public void testRepetitionsSThreads() throws Exception { + final Random r = random(); + int threadCount = TestUtil.nextInt(r, 2, 6); + Thread[] threads = new Thread[threadCount]; + final CountDownLatch startingGun = new CountDownLatch(1); + for (int tid = 0; tid < threadCount; tid++) { + final long seed = r.nextLong(); + threads[tid] = new Thread() { + @Override + public void run() { + try { + Random r = new Random(seed); + startingGun.await(); + for (int i = 0; i < 10; i++) { + int numShorts = TestUtil.nextInt(r, 1, 40000); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + short theValue = (short) r.nextInt(65535); + for (int j = 0; j < numShorts; j++) { + if (r.nextInt(10) == 0) { + theValue = (short) r.nextInt(65535); + } + bos.write((byte) (theValue >>> 8)); + bos.write((byte) theValue); + } + doTest(bos.toByteArray()); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + threads[tid].start(); + } + startingGun.countDown(); + for (Thread t : threads) { + t.join(); + } + } + + private void doTest(byte bytes[]) throws IOException { + InputStream rawIn = new ByteArrayInputStream(bytes); + Compressor c = compressor(); + + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + final Random r = random(); + int bufferSize = r.nextBoolean() ? 65535 : TestUtil.nextInt(random(), 1, 70000); + int prepadding = r.nextInt(70000); + int postpadding = r.nextInt(70000); + byte[] buffer = new byte[prepadding + bufferSize + postpadding]; + int len; + try (OutputStream os = c.threadLocalOutputStream(bos)) { + r.nextBytes(buffer); // fill block completely with junk + while ((len = rawIn.read(buffer, prepadding, bufferSize)) != -1) { + os.write(buffer, prepadding, len); + } + } + rawIn.close(); + + // now we have compressed byte array + InputStream in = c.threadLocalInputStream(new ByteArrayInputStream(bos.toByteArray())); + + // randomize constants again + bufferSize = r.nextBoolean() ? 65535 : TestUtil.nextInt(random(), 1, 70000); + prepadding = r.nextInt(70000); + postpadding = r.nextInt(70000); + buffer = new byte[prepadding + bufferSize + postpadding]; + r.nextBytes(buffer); // fill block completely with junk + + ByteArrayOutputStream uncompressedOut = new ByteArrayOutputStream(); + while ((len = in.read(buffer, prepadding, bufferSize)) != -1) { + uncompressedOut.write(buffer, prepadding, len); + } + uncompressedOut.close(); + + assertArrayEquals(bytes, uncompressedOut.toByteArray()); + } + + abstract Compressor compressor(); + +} diff --git a/server/src/test/java/org/opensearch/common/compress/DeflateCompressTests.java b/server/src/test/java/org/opensearch/common/compress/DeflateCompressTests.java index f215817914ee0..6178dcac9a390 100644 --- a/server/src/test/java/org/opensearch/common/compress/DeflateCompressTests.java +++ b/server/src/test/java/org/opensearch/common/compress/DeflateCompressTests.java @@ -32,403 +32,15 @@ package org.opensearch.common.compress; -import org.apache.lucene.tests.util.LineFileDocs; -import org.apache.lucene.tests.util.TestUtil; -import org.opensearch.test.OpenSearchTestCase; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.charset.StandardCharsets; -import java.util.Random; -import java.util.concurrent.CountDownLatch; - /** * Test streaming compression (e.g. used for recovery) */ -public class DeflateCompressTests extends OpenSearchTestCase { +public class DeflateCompressTests extends AbstractCompressorTests { private final Compressor compressor = new DeflateCompressor(); - public void testRandom() throws IOException { - Random r = random(); - for (int i = 0; i < 10; i++) { - byte bytes[] = new byte[TestUtil.nextInt(r, 1, 100000)]; - r.nextBytes(bytes); - doTest(bytes); - } - } - - public void testRandomThreads() throws Exception { - final Random r = random(); - int threadCount = TestUtil.nextInt(r, 2, 6); - Thread[] threads = new Thread[threadCount]; - final CountDownLatch startingGun = new CountDownLatch(1); - for (int tid = 0; tid < threadCount; tid++) { - final long seed = r.nextLong(); - threads[tid] = new Thread() { - @Override - public void run() { - try { - Random r = new Random(seed); - startingGun.await(); - for (int i = 0; i < 10; i++) { - byte bytes[] = new byte[TestUtil.nextInt(r, 1, 100000)]; - r.nextBytes(bytes); - doTest(bytes); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }; - threads[tid].start(); - } - startingGun.countDown(); - for (Thread t : threads) { - t.join(); - } - } - - public void testLineDocs() throws IOException { - Random r = random(); - LineFileDocs lineFileDocs = new LineFileDocs(r); - for (int i = 0; i < 10; i++) { - int numDocs = TestUtil.nextInt(r, 1, 200); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - for (int j = 0; j < numDocs; j++) { - String s = lineFileDocs.nextDoc().get("body"); - bos.write(s.getBytes(StandardCharsets.UTF_8)); - } - doTest(bos.toByteArray()); - } - lineFileDocs.close(); - } - - public void testLineDocsThreads() throws Exception { - final Random r = random(); - int threadCount = TestUtil.nextInt(r, 2, 6); - Thread[] threads = new Thread[threadCount]; - final CountDownLatch startingGun = new CountDownLatch(1); - for (int tid = 0; tid < threadCount; tid++) { - final long seed = r.nextLong(); - threads[tid] = new Thread() { - @Override - public void run() { - try { - Random r = new Random(seed); - startingGun.await(); - LineFileDocs lineFileDocs = new LineFileDocs(r); - for (int i = 0; i < 10; i++) { - int numDocs = TestUtil.nextInt(r, 1, 200); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - for (int j = 0; j < numDocs; j++) { - String s = lineFileDocs.nextDoc().get("body"); - bos.write(s.getBytes(StandardCharsets.UTF_8)); - } - doTest(bos.toByteArray()); - } - lineFileDocs.close(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }; - threads[tid].start(); - } - startingGun.countDown(); - for (Thread t : threads) { - t.join(); - } - } - - public void testRepetitionsL() throws IOException { - Random r = random(); - for (int i = 0; i < 10; i++) { - int numLongs = TestUtil.nextInt(r, 1, 10000); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - long theValue = r.nextLong(); - for (int j = 0; j < numLongs; j++) { - if (r.nextInt(10) == 0) { - theValue = r.nextLong(); - } - bos.write((byte) (theValue >>> 56)); - bos.write((byte) (theValue >>> 48)); - bos.write((byte) (theValue >>> 40)); - bos.write((byte) (theValue >>> 32)); - bos.write((byte) (theValue >>> 24)); - bos.write((byte) (theValue >>> 16)); - bos.write((byte) (theValue >>> 8)); - bos.write((byte) theValue); - } - doTest(bos.toByteArray()); - } - } - - public void testRepetitionsLThreads() throws Exception { - final Random r = random(); - int threadCount = TestUtil.nextInt(r, 2, 6); - Thread[] threads = new Thread[threadCount]; - final CountDownLatch startingGun = new CountDownLatch(1); - for (int tid = 0; tid < threadCount; tid++) { - final long seed = r.nextLong(); - threads[tid] = new Thread() { - @Override - public void run() { - try { - Random r = new Random(seed); - startingGun.await(); - for (int i = 0; i < 10; i++) { - int numLongs = TestUtil.nextInt(r, 1, 10000); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - long theValue = r.nextLong(); - for (int j = 0; j < numLongs; j++) { - if (r.nextInt(10) == 0) { - theValue = r.nextLong(); - } - bos.write((byte) (theValue >>> 56)); - bos.write((byte) (theValue >>> 48)); - bos.write((byte) (theValue >>> 40)); - bos.write((byte) (theValue >>> 32)); - bos.write((byte) (theValue >>> 24)); - bos.write((byte) (theValue >>> 16)); - bos.write((byte) (theValue >>> 8)); - bos.write((byte) theValue); - } - doTest(bos.toByteArray()); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }; - threads[tid].start(); - } - startingGun.countDown(); - for (Thread t : threads) { - t.join(); - } - } - - public void testRepetitionsI() throws IOException { - Random r = random(); - for (int i = 0; i < 10; i++) { - int numInts = TestUtil.nextInt(r, 1, 20000); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - int theValue = r.nextInt(); - for (int j = 0; j < numInts; j++) { - if (r.nextInt(10) == 0) { - theValue = r.nextInt(); - } - bos.write((byte) (theValue >>> 24)); - bos.write((byte) (theValue >>> 16)); - bos.write((byte) (theValue >>> 8)); - bos.write((byte) theValue); - } - doTest(bos.toByteArray()); - } - } - - public void testRepetitionsIThreads() throws Exception { - final Random r = random(); - int threadCount = TestUtil.nextInt(r, 2, 6); - Thread[] threads = new Thread[threadCount]; - final CountDownLatch startingGun = new CountDownLatch(1); - for (int tid = 0; tid < threadCount; tid++) { - final long seed = r.nextLong(); - threads[tid] = new Thread() { - @Override - public void run() { - try { - Random r = new Random(seed); - startingGun.await(); - for (int i = 0; i < 10; i++) { - int numInts = TestUtil.nextInt(r, 1, 20000); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - int theValue = r.nextInt(); - for (int j = 0; j < numInts; j++) { - if (r.nextInt(10) == 0) { - theValue = r.nextInt(); - } - bos.write((byte) (theValue >>> 24)); - bos.write((byte) (theValue >>> 16)); - bos.write((byte) (theValue >>> 8)); - bos.write((byte) theValue); - } - doTest(bos.toByteArray()); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }; - threads[tid].start(); - } - startingGun.countDown(); - for (Thread t : threads) { - t.join(); - } - } - - public void testRepetitionsS() throws IOException { - Random r = random(); - for (int i = 0; i < 10; i++) { - int numShorts = TestUtil.nextInt(r, 1, 40000); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - short theValue = (short) r.nextInt(65535); - for (int j = 0; j < numShorts; j++) { - if (r.nextInt(10) == 0) { - theValue = (short) r.nextInt(65535); - } - bos.write((byte) (theValue >>> 8)); - bos.write((byte) theValue); - } - doTest(bos.toByteArray()); - } - } - - public void testMixed() throws IOException { - Random r = random(); - LineFileDocs lineFileDocs = new LineFileDocs(r); - for (int i = 0; i < 2; ++i) { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - int prevInt = r.nextInt(); - long prevLong = r.nextLong(); - while (bos.size() < 400000) { - switch (r.nextInt(4)) { - case 0: - addInt(r, prevInt, bos); - break; - case 1: - addLong(r, prevLong, bos); - break; - case 2: - addString(lineFileDocs, bos); - break; - case 3: - addBytes(r, bos); - break; - default: - throw new IllegalStateException("Random is broken"); - } - } - doTest(bos.toByteArray()); - } - } - - private void addLong(Random r, long prev, ByteArrayOutputStream bos) { - long theValue = prev; - if (r.nextInt(10) != 0) { - theValue = r.nextLong(); - } - bos.write((byte) (theValue >>> 56)); - bos.write((byte) (theValue >>> 48)); - bos.write((byte) (theValue >>> 40)); - bos.write((byte) (theValue >>> 32)); - bos.write((byte) (theValue >>> 24)); - bos.write((byte) (theValue >>> 16)); - bos.write((byte) (theValue >>> 8)); - bos.write((byte) theValue); - } - - private void addInt(Random r, int prev, ByteArrayOutputStream bos) { - int theValue = prev; - if (r.nextInt(10) != 0) { - theValue = r.nextInt(); - } - bos.write((byte) (theValue >>> 24)); - bos.write((byte) (theValue >>> 16)); - bos.write((byte) (theValue >>> 8)); - bos.write((byte) theValue); - } - - private void addString(LineFileDocs lineFileDocs, ByteArrayOutputStream bos) throws IOException { - String s = lineFileDocs.nextDoc().get("body"); - bos.write(s.getBytes(StandardCharsets.UTF_8)); - } - - private void addBytes(Random r, ByteArrayOutputStream bos) throws IOException { - byte bytes[] = new byte[TestUtil.nextInt(r, 1, 10000)]; - r.nextBytes(bytes); - bos.write(bytes); - } - - public void testRepetitionsSThreads() throws Exception { - final Random r = random(); - int threadCount = TestUtil.nextInt(r, 2, 6); - Thread[] threads = new Thread[threadCount]; - final CountDownLatch startingGun = new CountDownLatch(1); - for (int tid = 0; tid < threadCount; tid++) { - final long seed = r.nextLong(); - threads[tid] = new Thread() { - @Override - public void run() { - try { - Random r = new Random(seed); - startingGun.await(); - for (int i = 0; i < 10; i++) { - int numShorts = TestUtil.nextInt(r, 1, 40000); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - short theValue = (short) r.nextInt(65535); - for (int j = 0; j < numShorts; j++) { - if (r.nextInt(10) == 0) { - theValue = (short) r.nextInt(65535); - } - bos.write((byte) (theValue >>> 8)); - bos.write((byte) theValue); - } - doTest(bos.toByteArray()); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }; - threads[tid].start(); - } - startingGun.countDown(); - for (Thread t : threads) { - t.join(); - } - } - - private void doTest(byte bytes[]) throws IOException { - InputStream rawIn = new ByteArrayInputStream(bytes); - Compressor c = compressor; - - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - final Random r = random(); - int bufferSize = r.nextBoolean() ? 65535 : TestUtil.nextInt(random(), 1, 70000); - int prepadding = r.nextInt(70000); - int postpadding = r.nextInt(70000); - byte[] buffer = new byte[prepadding + bufferSize + postpadding]; - int len; - try (OutputStream os = c.threadLocalOutputStream(bos)) { - r.nextBytes(buffer); // fill block completely with junk - while ((len = rawIn.read(buffer, prepadding, bufferSize)) != -1) { - os.write(buffer, prepadding, len); - } - } - rawIn.close(); - - // now we have compressed byte array - InputStream in = c.threadLocalInputStream(new ByteArrayInputStream(bos.toByteArray())); - - // randomize constants again - bufferSize = r.nextBoolean() ? 65535 : TestUtil.nextInt(random(), 1, 70000); - prepadding = r.nextInt(70000); - postpadding = r.nextInt(70000); - buffer = new byte[prepadding + bufferSize + postpadding]; - r.nextBytes(buffer); // fill block completely with junk - - ByteArrayOutputStream uncompressedOut = new ByteArrayOutputStream(); - while ((len = in.read(buffer, prepadding, bufferSize)) != -1) { - uncompressedOut.write(buffer, prepadding, len); - } - uncompressedOut.close(); - - assertArrayEquals(bytes, uncompressedOut.toByteArray()); + @Override + Compressor compressor() { + return compressor; } } diff --git a/server/src/test/java/org/opensearch/common/compress/ZstdCompressTests.java b/server/src/test/java/org/opensearch/common/compress/ZstdCompressTests.java new file mode 100644 index 0000000000000..b8de4a4e4bb1b --- /dev/null +++ b/server/src/test/java/org/opensearch/common/compress/ZstdCompressTests.java @@ -0,0 +1,22 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.compress; + +/** + * Test streaming compression + */ +public class ZstdCompressTests extends AbstractCompressorTests { + + private final Compressor compressor = new ZstdCompressor(); + + @Override + Compressor compressor() { + return compressor; + } +} diff --git a/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java b/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java index d7e017beb5fea..1c43fe73c8a78 100644 --- a/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java +++ b/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java @@ -40,6 +40,9 @@ import org.opensearch.common.blobstore.BlobStore; import org.opensearch.common.blobstore.fs.FsBlobStore; import org.opensearch.common.bytes.BytesArray; +import org.opensearch.common.compress.Compressor; +import org.opensearch.common.compress.CompressorFactory; +import org.opensearch.common.compress.CompressorType; import org.opensearch.common.io.Streams; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.io.stream.StreamInput; @@ -54,6 +57,7 @@ import java.io.EOFException; import java.io.IOException; import java.io.InputStream; +import java.util.Arrays; import java.util.Map; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.greaterThan; @@ -117,8 +121,13 @@ public void testBlobStoreOperations() throws IOException { ChecksumBlobStoreFormat checksumSMILE = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj::fromXContent); // Write blobs in different formats - checksumSMILE.write(new BlobObj("checksum smile"), blobContainer, "check-smile", false); - checksumSMILE.write(new BlobObj("checksum smile compressed"), blobContainer, "check-smile-comp", true); + checksumSMILE.write(new BlobObj("checksum smile"), blobContainer, "check-smile", CompressorType.NONE.compressor()); + checksumSMILE.write( + new BlobObj("checksum smile compressed"), + blobContainer, + "check-smile-comp", + CompressorFactory.DEFLATE_COMPRESSOR + ); // Assert that all checksum blobs can be read assertEquals(checksumSMILE.read(blobContainer, "check-smile", xContentRegistry()).getText(), "checksum smile"); @@ -134,8 +143,8 @@ public void testCompressionIsApplied() throws IOException { } ChecksumBlobStoreFormat checksumFormat = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj::fromXContent); BlobObj blobObj = new BlobObj(veryRedundantText.toString()); - checksumFormat.write(blobObj, blobContainer, "blob-comp", true); - checksumFormat.write(blobObj, blobContainer, "blob-not-comp", false); + checksumFormat.write(blobObj, blobContainer, "blob-comp", CompressorType.DEFLATE.compressor()); + checksumFormat.write(blobObj, blobContainer, "blob-not-comp", CompressorType.NONE.compressor()); Map blobs = blobContainer.listBlobsByPrefix("blob-"); assertEquals(blobs.size(), 2); assertThat(blobs.get("blob-not-comp").length(), greaterThan(blobs.get("blob-comp").length())); @@ -147,7 +156,12 @@ public void testBlobCorruption() throws IOException { String testString = randomAlphaOfLength(randomInt(10000)); BlobObj blobObj = new BlobObj(testString); ChecksumBlobStoreFormat checksumFormat = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj::fromXContent); - checksumFormat.write(blobObj, blobContainer, "test-path", randomBoolean()); + checksumFormat.write( + blobObj, + blobContainer, + "test-path", + randomFrom(Arrays.stream(CompressorType.values()).map(CompressorType::compressor).toArray(Compressor[]::new)) + ); assertEquals(checksumFormat.read(blobContainer, "test-path", xContentRegistry()).getText(), testString); randomCorruption(blobContainer, "test-path"); try { diff --git a/test/framework/src/main/java/org/opensearch/repositories/blobstore/OpenSearchBlobStoreRepositoryIntegTestCase.java b/test/framework/src/main/java/org/opensearch/repositories/blobstore/OpenSearchBlobStoreRepositoryIntegTestCase.java index 4fe9f354d8d10..c3075ef7c8a67 100644 --- a/test/framework/src/main/java/org/opensearch/repositories/blobstore/OpenSearchBlobStoreRepositoryIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/repositories/blobstore/OpenSearchBlobStoreRepositoryIntegTestCase.java @@ -47,6 +47,7 @@ import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; import org.opensearch.common.bytes.BytesArray; +import org.opensearch.common.compress.CompressorType; import org.opensearch.common.io.Streams; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; @@ -92,7 +93,13 @@ public static RepositoryData getRepositoryData(Repository repository) { protected abstract String repositoryType(); protected Settings repositorySettings() { - return Settings.builder().put("compress", randomBoolean()).build(); + final boolean compress = randomBoolean(); + final Settings.Builder builder = Settings.builder(); + builder.put("compress", compress); + if (compress) { + builder.put("compression_type", randomFrom(CompressorType.values())); + } + return builder.build(); } protected final String createRepository(final String name) { diff --git a/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java b/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java index e6d0a414a8f03..f5497fb60d44d 100644 --- a/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -52,6 +52,7 @@ import org.opensearch.common.Strings; import org.opensearch.common.UUIDs; import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.compress.CompressorType; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.ByteSizeUnit; import org.opensearch.common.unit.TimeValue; @@ -376,7 +377,11 @@ protected void createRepository(String repoName, String type) { protected Settings.Builder randomRepositorySettings() { final Settings.Builder settings = Settings.builder(); - settings.put("location", randomRepoPath()).put("compress", randomBoolean()); + final boolean compress = randomBoolean(); + settings.put("location", randomRepoPath()).put("compress", compress); + if (compress) { + settings.put("compression_type", randomFrom(CompressorType.values())); + } if (rarely()) { settings.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES); }