From bc911e411bd00b89ec184621d482175b78eb8378 Mon Sep 17 00:00:00 2001 From: Gloria Doci Date: Tue, 29 Aug 2023 12:19:28 +0000 Subject: [PATCH] SNOW-672156 support specifying compression algorithm to be used for BDEC Parquet files --- .../internal/ClientBufferParameters.java | 23 +++++++++++++-- .../streaming/internal/ParquetFlusher.java | 16 ++++++++-- .../streaming/internal/ParquetRowBuffer.java | 6 ++-- .../net/snowflake/ingest/utils/Constants.java | 29 +++++++++++++++++++ .../ingest/utils/ParameterProvider.java | 23 +++++++++++++++ .../parquet/hadoop/BdecParquetWriter.java | 7 +++-- .../internal/ParameterProviderTest.java | 8 +++++ .../streaming/internal/RowBufferTest.java | 3 +- 8 files changed, 104 insertions(+), 11 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java b/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java index dffd824c8..278d4abea 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java @@ -4,6 +4,7 @@ package net.snowflake.ingest.streaming.internal; +import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ParameterProvider; /** Channel's buffer relevant parameters that are set at the owning client level. */ @@ -15,6 +16,8 @@ public class ClientBufferParameters { private long maxAllowedRowSizeInBytes; + private Constants.BdecParquetCompression bdecParquetCompression; + /** * Private constructor used for test methods * @@ -26,10 +29,12 @@ public class ClientBufferParameters { private ClientBufferParameters( boolean enableParquetInternalBuffering, long maxChunkSizeInBytes, - long maxAllowedRowSizeInBytes) { + long maxAllowedRowSizeInBytes, + Constants.BdecParquetCompression bdecParquetCompression) { this.enableParquetInternalBuffering = enableParquetInternalBuffering; this.maxChunkSizeInBytes = maxChunkSizeInBytes; this.maxAllowedRowSizeInBytes = maxAllowedRowSizeInBytes; + this.bdecParquetCompression = bdecParquetCompression; } /** @param clientInternal reference to the client object where the relevant parameters are set */ @@ -46,6 +51,10 @@ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInter clientInternal != null ? clientInternal.getParameterProvider().getMaxAllowedRowSizeInBytes() : ParameterProvider.MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT; + this.bdecParquetCompression = + clientInternal != null + ? clientInternal.getParameterProvider().getBdecParquetCompressionAlgorithm() + : ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT; } /** @@ -58,9 +67,13 @@ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInter public static ClientBufferParameters test_createClientBufferParameters( boolean enableParquetInternalBuffering, long maxChunkSizeInBytes, - long maxAllowedRowSizeInBytes) { + long maxAllowedRowSizeInBytes, + Constants.BdecParquetCompression bdecParquetCompression) { return new ClientBufferParameters( - enableParquetInternalBuffering, maxChunkSizeInBytes, maxAllowedRowSizeInBytes); + enableParquetInternalBuffering, + maxChunkSizeInBytes, + maxAllowedRowSizeInBytes, + bdecParquetCompression); } public boolean getEnableParquetInternalBuffering() { @@ -74,4 +87,8 @@ public long getMaxChunkSizeInBytes() { public long getMaxAllowedRowSizeInBytes() { return maxAllowedRowSizeInBytes; } + + public Constants.BdecParquetCompression getBdecParquetCompression() { + return bdecParquetCompression; + } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java index 87c30207d..e81dbc0eb 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java @@ -9,6 +9,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.Logging; import net.snowflake.ingest.utils.Pair; @@ -27,15 +28,21 @@ public class ParquetFlusher implements Flusher { private final boolean enableParquetInternalBuffering; private final long maxChunkSizeInBytes; + private final Constants.BdecParquetCompression bdecParquetCompression; + /** * Construct parquet flusher from its schema and set flag that indicates whether Parquet memory * optimization is enabled, i.e. rows will be buffered in internal Parquet buffer. */ public ParquetFlusher( - MessageType schema, boolean enableParquetInternalBuffering, long maxChunkSizeInBytes) { + MessageType schema, + boolean enableParquetInternalBuffering, + long maxChunkSizeInBytes, + Constants.BdecParquetCompression bdecParquetCompression) { this.schema = schema; this.enableParquetInternalBuffering = enableParquetInternalBuffering; this.maxChunkSizeInBytes = maxChunkSizeInBytes; + this.bdecParquetCompression = bdecParquetCompression; } @Override @@ -198,7 +205,12 @@ private SerializationResult serializeFromJavaObjects( Map metadata = channelsDataPerTable.get(0).getVectors().metadata; parquetWriter = new BdecParquetWriter( - mergedData, schema, metadata, firstChannelFullyQualifiedTableName, maxChunkSizeInBytes); + mergedData, + schema, + metadata, + firstChannelFullyQualifiedTableName, + maxChunkSizeInBytes, + bdecParquetCompression); rows.forEach(parquetWriter::writeRow); parquetWriter.close(); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java index 289b8a983..9c838fbef 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java @@ -121,7 +121,8 @@ private void createFileWriter() { schema, metadata, channelName, - clientBufferParameters.getMaxChunkSizeInBytes()); + clientBufferParameters.getMaxChunkSizeInBytes(), + clientBufferParameters.getBdecParquetCompression()); } else { this.bdecParquetWriter = null; } @@ -322,7 +323,8 @@ public Flusher createFlusher() { return new ParquetFlusher( schema, clientBufferParameters.getEnableParquetInternalBuffering(), - clientBufferParameters.getMaxChunkSizeInBytes()); + clientBufferParameters.getMaxChunkSizeInBytes(), + clientBufferParameters.getBdecParquetCompression()); } private static class ParquetColumn { diff --git a/src/main/java/net/snowflake/ingest/utils/Constants.java b/src/main/java/net/snowflake/ingest/utils/Constants.java index 09814235b..b24279ab9 100644 --- a/src/main/java/net/snowflake/ingest/utils/Constants.java +++ b/src/main/java/net/snowflake/ingest/utils/Constants.java @@ -5,6 +5,7 @@ package net.snowflake.ingest.utils; import java.util.Arrays; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; /** Contains all the constants needed for Streaming Ingest */ public class Constants { @@ -113,6 +114,34 @@ public static BdecVersion fromInt(int val) { } } + /** + * Compression algorithm supported by BDEC Parquet Writer. It is a wrapper around Parquet's lib + * CompressionCodecName, but we want to control and allow only specific values of that. + */ + public enum BdecParquetCompression { + UNCOMPRESSED, + GZIP, + SNAPPY, + ZSTD; + + public CompressionCodecName getCompressionCodec() { + return (this == UNCOMPRESSED + ? CompressionCodecName.fromConf(null) + : CompressionCodecName.fromConf(this.name())); + } + + public static BdecParquetCompression fromName(String name) { + for (BdecParquetCompression e : BdecParquetCompression.values()) { + if (e.name().toLowerCase().equals(name.toLowerCase())) { + return e; + } + } + throw new IllegalArgumentException( + String.format( + "Unsupported BDEC_PARQUET_COMPRESSION_ALGORITHM = '%d', allowed values are %s", + name, Arrays.asList(BdecParquetCompression.values()))); + } + } // Parameters public static final boolean DISABLE_BACKGROUND_FLUSH = false; public static final boolean COMPRESS_BLOB_TWICE = false; diff --git a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java index 5c6f81f66..2a368a518 100644 --- a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java +++ b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java @@ -36,6 +36,9 @@ public class ParameterProvider { public static final String MAX_CLIENT_LAG_ENABLED = "MAX_CLIENT_LAG_ENABLED".toLowerCase(); + public static final String BDEC_PARQUET_COMPRESSION_ALGORITHM = + "BDEC_PARQUET_COMPRESSION_ALGORITHM".toLowerCase(); + // Default values public static final long BUFFER_FLUSH_INTERVAL_IN_MILLIS_DEFAULT = 1000; public static final long BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS_DEFAULT = 100; @@ -60,6 +63,9 @@ public class ParameterProvider { static final long MAX_CLIENT_LAG_MS_MAX = TimeUnit.MINUTES.toMillis(10); public static final long MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT = 64 * 1024 * 1024; // 64 MB + public static final Constants.BdecParquetCompression BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT = + Constants.BdecParquetCompression.GZIP; + /* Parameter that enables using internal Parquet buffers for buffering of rows before serializing. It reduces memory consumption compared to using Java Objects for buffering.*/ public static final boolean ENABLE_PARQUET_INTERNAL_BUFFERING_DEFAULT = false; @@ -170,6 +176,12 @@ private void setParameterMap(Map parameterOverrides, Properties this.updateValue(MAX_CLIENT_LAG, MAX_CLIENT_LAG_DEFAULT, parameterOverrides, props); this.updateValue( MAX_CLIENT_LAG_ENABLED, MAX_CLIENT_LAG_ENABLED_DEFAULT, parameterOverrides, props); + + this.updateValue( + BDEC_PARQUET_COMPRESSION_ALGORITHM, + BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT, + parameterOverrides, + props); } /** @return Longest interval in milliseconds between buffer flushes */ @@ -376,6 +388,17 @@ public long getMaxAllowedRowSizeInBytes() { return (val instanceof String) ? Long.parseLong(val.toString()) : (long) val; } + /** @return BDEC compression algorithm */ + public Constants.BdecParquetCompression getBdecParquetCompressionAlgorithm() { + Object val = + this.parameterMap.getOrDefault( + BDEC_PARQUET_COMPRESSION_ALGORITHM, BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT); + if (val instanceof Constants.BdecParquetCompression) { + return (Constants.BdecParquetCompression) val; + } + return Constants.BdecParquetCompression.fromName((String) val); + } + @Override public String toString() { return "ParameterProvider{" + "parameterMap=" + parameterMap + '}'; diff --git a/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java b/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java index b2442d3dc..8b71cfd0e 100644 --- a/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java +++ b/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java @@ -17,7 +17,6 @@ import org.apache.parquet.column.values.factory.DefaultV1ValuesWriterFactory; import org.apache.parquet.crypto.FileEncryptionProperties; import org.apache.parquet.hadoop.api.WriteSupport; -import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.io.DelegatingPositionOutputStream; import org.apache.parquet.io.OutputFile; import org.apache.parquet.io.ParquetEncodingException; @@ -51,7 +50,8 @@ public BdecParquetWriter( MessageType schema, Map extraMetaData, String channelName, - long maxChunkSizeInBytes) + long maxChunkSizeInBytes, + Constants.BdecParquetCompression bdecParquetCompression) throws IOException { OutputFile file = new ByteArrayOutputFile(stream, maxChunkSizeInBytes); ParquetProperties encodingProps = createParquetProperties(); @@ -86,7 +86,8 @@ public BdecParquetWriter( */ codecFactory = new CodecFactory(conf, ParquetWriter.DEFAULT_PAGE_SIZE); @SuppressWarnings("deprecation") // Parquet does not support the new one now - CodecFactory.BytesCompressor compressor = codecFactory.getCompressor(CompressionCodecName.GZIP); + CodecFactory.BytesCompressor compressor = + codecFactory.getCompressor(bdecParquetCompression.getCompressionCodec()); writer = new InternalParquetRecordWriter<>( fileWriter, diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java index def5f7ecf..f97618ea0 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java @@ -3,6 +3,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Properties; +import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ParameterProvider; import org.junit.Assert; import org.junit.Test; @@ -20,6 +21,7 @@ private Map getStartingParameterMap() { parameterMap.put(ParameterProvider.BLOB_UPLOAD_MAX_RETRY_COUNT, 100); parameterMap.put(ParameterProvider.MAX_MEMORY_LIMIT_IN_BYTES, 1000L); parameterMap.put(ParameterProvider.MAX_CHANNEL_SIZE_IN_BYTES, 1000000L); + parameterMap.put(ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM, "zstd"); return parameterMap; } @@ -39,6 +41,9 @@ public void withValuesSet() { Assert.assertEquals(100, parameterProvider.getBlobUploadMaxRetryCount()); Assert.assertEquals(1000L, parameterProvider.getMaxMemoryLimitInBytes()); Assert.assertEquals(1000000L, parameterProvider.getMaxChannelSizeInBytes()); + Assert.assertEquals( + Constants.BdecParquetCompression.ZSTD, + parameterProvider.getBdecParquetCompressionAlgorithm()); } @Test @@ -130,6 +135,9 @@ public void withDefaultValues() { Assert.assertEquals( ParameterProvider.MAX_CHANNEL_SIZE_IN_BYTES_DEFAULT, parameterProvider.getMaxChannelSizeInBytes()); + Assert.assertEquals( + ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT, + parameterProvider.getBdecParquetCompressionAlgorithm()); } @Test diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java index 63340e25a..79c543173 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java @@ -118,7 +118,8 @@ private AbstractRowBuffer createTestBuffer(OpenChannelRequest.OnErrorOption o ClientBufferParameters.test_createClientBufferParameters( enableParquetMemoryOptimization, MAX_CHUNK_SIZE_IN_BYTES_DEFAULT, - MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT)); + MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT, + Constants.BdecParquetCompression.GZIP)); } @Test