Skip to content

Commit

Permalink
SNOW-672156 support specifying compression algorithm to be used for B…
Browse files Browse the repository at this point in the history
…DEC Parquet files
  • Loading branch information
sfc-gh-gdoci committed Sep 25, 2023
1 parent 687327b commit bc911e4
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package net.snowflake.ingest.streaming.internal;

import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ParameterProvider;

/** Channel's buffer relevant parameters that are set at the owning client level. */
Expand All @@ -15,6 +16,8 @@ public class ClientBufferParameters {

private long maxAllowedRowSizeInBytes;

private Constants.BdecParquetCompression bdecParquetCompression;

/**
* Private constructor used for test methods
*
Expand All @@ -26,10 +29,12 @@ public class ClientBufferParameters {
private ClientBufferParameters(
boolean enableParquetInternalBuffering,
long maxChunkSizeInBytes,
long maxAllowedRowSizeInBytes) {
long maxAllowedRowSizeInBytes,
Constants.BdecParquetCompression bdecParquetCompression) {
this.enableParquetInternalBuffering = enableParquetInternalBuffering;
this.maxChunkSizeInBytes = maxChunkSizeInBytes;
this.maxAllowedRowSizeInBytes = maxAllowedRowSizeInBytes;
this.bdecParquetCompression = bdecParquetCompression;
}

/** @param clientInternal reference to the client object where the relevant parameters are set */
Expand All @@ -46,6 +51,10 @@ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInter
clientInternal != null
? clientInternal.getParameterProvider().getMaxAllowedRowSizeInBytes()
: ParameterProvider.MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT;
this.bdecParquetCompression =
clientInternal != null
? clientInternal.getParameterProvider().getBdecParquetCompressionAlgorithm()
: ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT;
}

/**
Expand All @@ -58,9 +67,13 @@ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInter
public static ClientBufferParameters test_createClientBufferParameters(
boolean enableParquetInternalBuffering,
long maxChunkSizeInBytes,
long maxAllowedRowSizeInBytes) {
long maxAllowedRowSizeInBytes,
Constants.BdecParquetCompression bdecParquetCompression) {
return new ClientBufferParameters(
enableParquetInternalBuffering, maxChunkSizeInBytes, maxAllowedRowSizeInBytes);
enableParquetInternalBuffering,
maxChunkSizeInBytes,
maxAllowedRowSizeInBytes,
bdecParquetCompression);
}

public boolean getEnableParquetInternalBuffering() {
Expand All @@ -74,4 +87,8 @@ public long getMaxChunkSizeInBytes() {
public long getMaxAllowedRowSizeInBytes() {
return maxAllowedRowSizeInBytes;
}

public Constants.BdecParquetCompression getBdecParquetCompression() {
return bdecParquetCompression;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.Logging;
import net.snowflake.ingest.utils.Pair;
Expand All @@ -27,15 +28,21 @@ public class ParquetFlusher implements Flusher<ParquetChunkData> {
private final boolean enableParquetInternalBuffering;
private final long maxChunkSizeInBytes;

private final Constants.BdecParquetCompression bdecParquetCompression;

/**
* Construct parquet flusher from its schema and set flag that indicates whether Parquet memory
* optimization is enabled, i.e. rows will be buffered in internal Parquet buffer.
*/
public ParquetFlusher(
MessageType schema, boolean enableParquetInternalBuffering, long maxChunkSizeInBytes) {
MessageType schema,
boolean enableParquetInternalBuffering,
long maxChunkSizeInBytes,
Constants.BdecParquetCompression bdecParquetCompression) {
this.schema = schema;
this.enableParquetInternalBuffering = enableParquetInternalBuffering;
this.maxChunkSizeInBytes = maxChunkSizeInBytes;
this.bdecParquetCompression = bdecParquetCompression;
}

@Override
Expand Down Expand Up @@ -198,7 +205,12 @@ private SerializationResult serializeFromJavaObjects(
Map<String, String> metadata = channelsDataPerTable.get(0).getVectors().metadata;
parquetWriter =
new BdecParquetWriter(
mergedData, schema, metadata, firstChannelFullyQualifiedTableName, maxChunkSizeInBytes);
mergedData,
schema,
metadata,
firstChannelFullyQualifiedTableName,
maxChunkSizeInBytes,
bdecParquetCompression);
rows.forEach(parquetWriter::writeRow);
parquetWriter.close();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ private void createFileWriter() {
schema,
metadata,
channelName,
clientBufferParameters.getMaxChunkSizeInBytes());
clientBufferParameters.getMaxChunkSizeInBytes(),
clientBufferParameters.getBdecParquetCompression());
} else {
this.bdecParquetWriter = null;
}
Expand Down Expand Up @@ -322,7 +323,8 @@ public Flusher<ParquetChunkData> createFlusher() {
return new ParquetFlusher(
schema,
clientBufferParameters.getEnableParquetInternalBuffering(),
clientBufferParameters.getMaxChunkSizeInBytes());
clientBufferParameters.getMaxChunkSizeInBytes(),
clientBufferParameters.getBdecParquetCompression());
}

private static class ParquetColumn {
Expand Down
29 changes: 29 additions & 0 deletions src/main/java/net/snowflake/ingest/utils/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package net.snowflake.ingest.utils;

import java.util.Arrays;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

/** Contains all the constants needed for Streaming Ingest */
public class Constants {
Expand Down Expand Up @@ -113,6 +114,34 @@ public static BdecVersion fromInt(int val) {
}
}

/**
* Compression algorithm supported by BDEC Parquet Writer. It is a wrapper around Parquet's lib
* CompressionCodecName, but we want to control and allow only specific values of that.
*/
public enum BdecParquetCompression {
UNCOMPRESSED,
GZIP,
SNAPPY,
ZSTD;

public CompressionCodecName getCompressionCodec() {
return (this == UNCOMPRESSED
? CompressionCodecName.fromConf(null)
: CompressionCodecName.fromConf(this.name()));
}

public static BdecParquetCompression fromName(String name) {
for (BdecParquetCompression e : BdecParquetCompression.values()) {
if (e.name().toLowerCase().equals(name.toLowerCase())) {
return e;
}
}
throw new IllegalArgumentException(
String.format(
"Unsupported BDEC_PARQUET_COMPRESSION_ALGORITHM = '%d', allowed values are %s",
name, Arrays.asList(BdecParquetCompression.values())));
}
}
// Parameters
public static final boolean DISABLE_BACKGROUND_FLUSH = false;
public static final boolean COMPRESS_BLOB_TWICE = false;
Expand Down
23 changes: 23 additions & 0 deletions src/main/java/net/snowflake/ingest/utils/ParameterProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ public class ParameterProvider {

public static final String MAX_CLIENT_LAG_ENABLED = "MAX_CLIENT_LAG_ENABLED".toLowerCase();

public static final String BDEC_PARQUET_COMPRESSION_ALGORITHM =
"BDEC_PARQUET_COMPRESSION_ALGORITHM".toLowerCase();

// Default values
public static final long BUFFER_FLUSH_INTERVAL_IN_MILLIS_DEFAULT = 1000;
public static final long BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS_DEFAULT = 100;
Expand All @@ -60,6 +63,9 @@ public class ParameterProvider {
static final long MAX_CLIENT_LAG_MS_MAX = TimeUnit.MINUTES.toMillis(10);
public static final long MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT = 64 * 1024 * 1024; // 64 MB

public static final Constants.BdecParquetCompression BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT =
Constants.BdecParquetCompression.GZIP;

/* Parameter that enables using internal Parquet buffers for buffering of rows before serializing.
It reduces memory consumption compared to using Java Objects for buffering.*/
public static final boolean ENABLE_PARQUET_INTERNAL_BUFFERING_DEFAULT = false;
Expand Down Expand Up @@ -170,6 +176,12 @@ private void setParameterMap(Map<String, Object> parameterOverrides, Properties
this.updateValue(MAX_CLIENT_LAG, MAX_CLIENT_LAG_DEFAULT, parameterOverrides, props);
this.updateValue(
MAX_CLIENT_LAG_ENABLED, MAX_CLIENT_LAG_ENABLED_DEFAULT, parameterOverrides, props);

this.updateValue(
BDEC_PARQUET_COMPRESSION_ALGORITHM,
BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT,
parameterOverrides,
props);
}

/** @return Longest interval in milliseconds between buffer flushes */
Expand Down Expand Up @@ -376,6 +388,17 @@ public long getMaxAllowedRowSizeInBytes() {
return (val instanceof String) ? Long.parseLong(val.toString()) : (long) val;
}

/** @return BDEC compression algorithm */
public Constants.BdecParquetCompression getBdecParquetCompressionAlgorithm() {
Object val =
this.parameterMap.getOrDefault(
BDEC_PARQUET_COMPRESSION_ALGORITHM, BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT);
if (val instanceof Constants.BdecParquetCompression) {
return (Constants.BdecParquetCompression) val;
}
return Constants.BdecParquetCompression.fromName((String) val);
}

@Override
public String toString() {
return "ParameterProvider{" + "parameterMap=" + parameterMap + '}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import org.apache.parquet.column.values.factory.DefaultV1ValuesWriterFactory;
import org.apache.parquet.crypto.FileEncryptionProperties;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.io.DelegatingPositionOutputStream;
import org.apache.parquet.io.OutputFile;
import org.apache.parquet.io.ParquetEncodingException;
Expand Down Expand Up @@ -51,7 +50,8 @@ public BdecParquetWriter(
MessageType schema,
Map<String, String> extraMetaData,
String channelName,
long maxChunkSizeInBytes)
long maxChunkSizeInBytes,
Constants.BdecParquetCompression bdecParquetCompression)
throws IOException {
OutputFile file = new ByteArrayOutputFile(stream, maxChunkSizeInBytes);
ParquetProperties encodingProps = createParquetProperties();
Expand Down Expand Up @@ -86,7 +86,8 @@ public BdecParquetWriter(
*/
codecFactory = new CodecFactory(conf, ParquetWriter.DEFAULT_PAGE_SIZE);
@SuppressWarnings("deprecation") // Parquet does not support the new one now
CodecFactory.BytesCompressor compressor = codecFactory.getCompressor(CompressionCodecName.GZIP);
CodecFactory.BytesCompressor compressor =
codecFactory.getCompressor(bdecParquetCompression.getCompressionCodec());
writer =
new InternalParquetRecordWriter<>(
fileWriter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ParameterProvider;
import org.junit.Assert;
import org.junit.Test;
Expand All @@ -20,6 +21,7 @@ private Map<String, Object> getStartingParameterMap() {
parameterMap.put(ParameterProvider.BLOB_UPLOAD_MAX_RETRY_COUNT, 100);
parameterMap.put(ParameterProvider.MAX_MEMORY_LIMIT_IN_BYTES, 1000L);
parameterMap.put(ParameterProvider.MAX_CHANNEL_SIZE_IN_BYTES, 1000000L);
parameterMap.put(ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM, "zstd");
return parameterMap;
}

Expand All @@ -39,6 +41,9 @@ public void withValuesSet() {
Assert.assertEquals(100, parameterProvider.getBlobUploadMaxRetryCount());
Assert.assertEquals(1000L, parameterProvider.getMaxMemoryLimitInBytes());
Assert.assertEquals(1000000L, parameterProvider.getMaxChannelSizeInBytes());
Assert.assertEquals(
Constants.BdecParquetCompression.ZSTD,
parameterProvider.getBdecParquetCompressionAlgorithm());
}

@Test
Expand Down Expand Up @@ -130,6 +135,9 @@ public void withDefaultValues() {
Assert.assertEquals(
ParameterProvider.MAX_CHANNEL_SIZE_IN_BYTES_DEFAULT,
parameterProvider.getMaxChannelSizeInBytes());
Assert.assertEquals(
ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT,
parameterProvider.getBdecParquetCompressionAlgorithm());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ private AbstractRowBuffer<?> createTestBuffer(OpenChannelRequest.OnErrorOption o
ClientBufferParameters.test_createClientBufferParameters(
enableParquetMemoryOptimization,
MAX_CHUNK_SIZE_IN_BYTES_DEFAULT,
MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT));
MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT,
Constants.BdecParquetCompression.GZIP));
}

@Test
Expand Down

0 comments on commit bc911e4

Please sign in to comment.