Skip to content

Commit

Permalink
SNOW-672156 support specifying compression algorithm to be used for B…
Browse files Browse the repository at this point in the history
…DEC Parquet files. GZIP is the only allowed value for now. (#579)
  • Loading branch information
sfc-gh-gdoci authored Oct 31, 2023
1 parent 4450fc4 commit d0fd1d8
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package net.snowflake.ingest.streaming.internal;

import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ParameterProvider;

/** Channel's buffer relevant parameters that are set at the owning client level. */
Expand All @@ -15,6 +16,8 @@ public class ClientBufferParameters {

private long maxAllowedRowSizeInBytes;

private Constants.BdecParquetCompression bdecParquetCompression;

/**
* Private constructor used for test methods
*
Expand All @@ -26,10 +29,12 @@ public class ClientBufferParameters {
private ClientBufferParameters(
boolean enableParquetInternalBuffering,
long maxChunkSizeInBytes,
long maxAllowedRowSizeInBytes) {
long maxAllowedRowSizeInBytes,
Constants.BdecParquetCompression bdecParquetCompression) {
this.enableParquetInternalBuffering = enableParquetInternalBuffering;
this.maxChunkSizeInBytes = maxChunkSizeInBytes;
this.maxAllowedRowSizeInBytes = maxAllowedRowSizeInBytes;
this.bdecParquetCompression = bdecParquetCompression;
}

/** @param clientInternal reference to the client object where the relevant parameters are set */
Expand All @@ -46,6 +51,10 @@ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInter
clientInternal != null
? clientInternal.getParameterProvider().getMaxAllowedRowSizeInBytes()
: ParameterProvider.MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT;
this.bdecParquetCompression =
clientInternal != null
? clientInternal.getParameterProvider().getBdecParquetCompressionAlgorithm()
: ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT;
}

/**
Expand All @@ -58,9 +67,13 @@ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInter
public static ClientBufferParameters test_createClientBufferParameters(
boolean enableParquetInternalBuffering,
long maxChunkSizeInBytes,
long maxAllowedRowSizeInBytes) {
long maxAllowedRowSizeInBytes,
Constants.BdecParquetCompression bdecParquetCompression) {
return new ClientBufferParameters(
enableParquetInternalBuffering, maxChunkSizeInBytes, maxAllowedRowSizeInBytes);
enableParquetInternalBuffering,
maxChunkSizeInBytes,
maxAllowedRowSizeInBytes,
bdecParquetCompression);
}

public boolean getEnableParquetInternalBuffering() {
Expand All @@ -74,4 +87,8 @@ public long getMaxChunkSizeInBytes() {
public long getMaxAllowedRowSizeInBytes() {
return maxAllowedRowSizeInBytes;
}

public Constants.BdecParquetCompression getBdecParquetCompression() {
return bdecParquetCompression;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.Logging;
import net.snowflake.ingest.utils.Pair;
Expand All @@ -27,15 +28,21 @@ public class ParquetFlusher implements Flusher<ParquetChunkData> {
private final boolean enableParquetInternalBuffering;
private final long maxChunkSizeInBytes;

private final Constants.BdecParquetCompression bdecParquetCompression;

/**
* Construct parquet flusher from its schema and set flag that indicates whether Parquet memory
* optimization is enabled, i.e. rows will be buffered in internal Parquet buffer.
*/
public ParquetFlusher(
MessageType schema, boolean enableParquetInternalBuffering, long maxChunkSizeInBytes) {
MessageType schema,
boolean enableParquetInternalBuffering,
long maxChunkSizeInBytes,
Constants.BdecParquetCompression bdecParquetCompression) {
this.schema = schema;
this.enableParquetInternalBuffering = enableParquetInternalBuffering;
this.maxChunkSizeInBytes = maxChunkSizeInBytes;
this.bdecParquetCompression = bdecParquetCompression;
}

@Override
Expand Down Expand Up @@ -198,7 +205,12 @@ private SerializationResult serializeFromJavaObjects(
Map<String, String> metadata = channelsDataPerTable.get(0).getVectors().metadata;
parquetWriter =
new BdecParquetWriter(
mergedData, schema, metadata, firstChannelFullyQualifiedTableName, maxChunkSizeInBytes);
mergedData,
schema,
metadata,
firstChannelFullyQualifiedTableName,
maxChunkSizeInBytes,
bdecParquetCompression);
rows.forEach(parquetWriter::writeRow);
parquetWriter.close();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ private void createFileWriter() {
schema,
metadata,
channelName,
clientBufferParameters.getMaxChunkSizeInBytes());
clientBufferParameters.getMaxChunkSizeInBytes(),
clientBufferParameters.getBdecParquetCompression());
} else {
this.bdecParquetWriter = null;
}
Expand Down Expand Up @@ -323,7 +324,8 @@ public Flusher<ParquetChunkData> createFlusher() {
return new ParquetFlusher(
schema,
clientBufferParameters.getEnableParquetInternalBuffering(),
clientBufferParameters.getMaxChunkSizeInBytes());
clientBufferParameters.getMaxChunkSizeInBytes(),
clientBufferParameters.getBdecParquetCompression());
}

private static class ParquetColumn {
Expand Down
24 changes: 24 additions & 0 deletions src/main/java/net/snowflake/ingest/utils/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package net.snowflake.ingest.utils;

import java.util.Arrays;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

/** Contains all the constants needed for Streaming Ingest */
public class Constants {
Expand Down Expand Up @@ -113,6 +114,29 @@ public static BdecVersion fromInt(int val) {
}
}

/**
* Compression algorithm supported by BDEC Parquet Writer. It is a wrapper around Parquet's lib
* CompressionCodecName, but we want to control and allow only specific values of that.
*/
public enum BdecParquetCompression {
GZIP;

public CompressionCodecName getCompressionCodec() {
return CompressionCodecName.fromConf(this.name());
}

public static BdecParquetCompression fromName(String name) {
for (BdecParquetCompression e : BdecParquetCompression.values()) {
if (e.name().toLowerCase().equals(name.toLowerCase())) {
return e;
}
}
throw new IllegalArgumentException(
String.format(
"Unsupported BDEC_PARQUET_COMPRESSION_ALGORITHM = '%s', allowed values are %s",
name, Arrays.asList(BdecParquetCompression.values())));
}
}
// Parameters
public static final boolean DISABLE_BACKGROUND_FLUSH = false;
public static final boolean COMPRESS_BLOB_TWICE = false;
Expand Down
24 changes: 23 additions & 1 deletion src/main/java/net/snowflake/ingest/utils/ParameterProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ public class ParameterProvider {

public static final String MAX_CLIENT_LAG_ENABLED = "MAX_CLIENT_LAG_ENABLED".toLowerCase();

public static final String BDEC_PARQUET_COMPRESSION_ALGORITHM =
"BDEC_PARQUET_COMPRESSION_ALGORITHM".toLowerCase();

// Default values
public static final long BUFFER_FLUSH_INTERVAL_IN_MILLIS_DEFAULT = 1000;
public static final long BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS_DEFAULT = 100;
Expand All @@ -63,6 +66,9 @@ public class ParameterProvider {
public static final long MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT = 64 * 1024 * 1024; // 64 MB
public static final int MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT = 100;

public static final Constants.BdecParquetCompression BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT =
Constants.BdecParquetCompression.GZIP;

/* Parameter that enables using internal Parquet buffers for buffering of rows before serializing.
It reduces memory consumption compared to using Java Objects for buffering.*/
public static final boolean ENABLE_PARQUET_INTERNAL_BUFFERING_DEFAULT = false;
Expand Down Expand Up @@ -178,6 +184,12 @@ private void setParameterMap(Map<String, Object> parameterOverrides, Properties
MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT,
parameterOverrides,
props);

this.updateValue(
BDEC_PARQUET_COMPRESSION_ALGORITHM,
BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT,
parameterOverrides,
props);
}

/** @return Longest interval in milliseconds between buffer flushes */
Expand Down Expand Up @@ -377,7 +389,6 @@ public long getMaxChunkSizeInBytes() {
return (val instanceof String) ? Long.parseLong(val.toString()) : (long) val;
}

/** @return The max allow row size (in bytes) */
public long getMaxAllowedRowSizeInBytes() {
Object val =
this.parameterMap.getOrDefault(
Expand All @@ -397,6 +408,17 @@ public int getMaxChunksInBlobAndRegistrationRequest() {
return (val instanceof String) ? Integer.parseInt(val.toString()) : (int) val;
}

/** @return BDEC compression algorithm */
public Constants.BdecParquetCompression getBdecParquetCompressionAlgorithm() {
Object val =
this.parameterMap.getOrDefault(
BDEC_PARQUET_COMPRESSION_ALGORITHM, BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT);
if (val instanceof Constants.BdecParquetCompression) {
return (Constants.BdecParquetCompression) val;
}
return Constants.BdecParquetCompression.fromName((String) val);
}

@Override
public String toString() {
return "ParameterProvider{" + "parameterMap=" + parameterMap + '}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import org.apache.parquet.column.values.factory.DefaultV1ValuesWriterFactory;
import org.apache.parquet.crypto.FileEncryptionProperties;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.io.DelegatingPositionOutputStream;
import org.apache.parquet.io.OutputFile;
import org.apache.parquet.io.ParquetEncodingException;
Expand Down Expand Up @@ -51,7 +50,8 @@ public BdecParquetWriter(
MessageType schema,
Map<String, String> extraMetaData,
String channelName,
long maxChunkSizeInBytes)
long maxChunkSizeInBytes,
Constants.BdecParquetCompression bdecParquetCompression)
throws IOException {
OutputFile file = new ByteArrayOutputFile(stream, maxChunkSizeInBytes);
ParquetProperties encodingProps = createParquetProperties();
Expand Down Expand Up @@ -86,7 +86,8 @@ public BdecParquetWriter(
*/
codecFactory = new CodecFactory(conf, ParquetWriter.DEFAULT_PAGE_SIZE);
@SuppressWarnings("deprecation") // Parquet does not support the new one now
CodecFactory.BytesCompressor compressor = codecFactory.getCompressor(CompressionCodecName.GZIP);
CodecFactory.BytesCompressor compressor =
codecFactory.getCompressor(bdecParquetCompression.getCompressionCodec());
writer =
new InternalParquetRecordWriter<>(
fileWriter,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package net.snowflake.ingest.streaming.internal;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ParameterProvider;
import org.junit.Assert;
import org.junit.Test;
Expand All @@ -20,6 +23,7 @@ private Map<String, Object> getStartingParameterMap() {
parameterMap.put(ParameterProvider.BLOB_UPLOAD_MAX_RETRY_COUNT, 100);
parameterMap.put(ParameterProvider.MAX_MEMORY_LIMIT_IN_BYTES, 1000L);
parameterMap.put(ParameterProvider.MAX_CHANNEL_SIZE_IN_BYTES, 1000000L);
parameterMap.put(ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM, "gzip");
return parameterMap;
}

Expand All @@ -39,6 +43,9 @@ public void withValuesSet() {
Assert.assertEquals(100, parameterProvider.getBlobUploadMaxRetryCount());
Assert.assertEquals(1000L, parameterProvider.getMaxMemoryLimitInBytes());
Assert.assertEquals(1000000L, parameterProvider.getMaxChannelSizeInBytes());
Assert.assertEquals(
Constants.BdecParquetCompression.GZIP,
parameterProvider.getBdecParquetCompressionAlgorithm());
}

@Test
Expand Down Expand Up @@ -130,6 +137,9 @@ public void withDefaultValues() {
Assert.assertEquals(
ParameterProvider.MAX_CHANNEL_SIZE_IN_BYTES_DEFAULT,
parameterProvider.getMaxChannelSizeInBytes());
Assert.assertEquals(
ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT,
parameterProvider.getBdecParquetCompressionAlgorithm());
}

@Test
Expand Down Expand Up @@ -281,4 +291,36 @@ public void testMaxChunksInBlobAndRegistrationRequest() {
ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop);
Assert.assertEquals(1, parameterProvider.getMaxChunksInBlobAndRegistrationRequest());
}

@Test
public void testValidCompressionAlgorithmsAndWithUppercaseLowerCase() {
List<String> gzipValues = Arrays.asList("GZIP", "gzip", "Gzip", "gZip");
gzipValues.forEach(
v -> {
Properties prop = new Properties();
Map<String, Object> parameterMap = getStartingParameterMap();
parameterMap.put(ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM, v);
ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop);
Assert.assertEquals(
Constants.BdecParquetCompression.GZIP,
parameterProvider.getBdecParquetCompressionAlgorithm());
});
}

@Test
public void testInvalidCompressionAlgorithm() {
Properties prop = new Properties();
Map<String, Object> parameterMap = getStartingParameterMap();
parameterMap.put(ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM, "invalid_comp");
ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop);
try {
parameterProvider.getBdecParquetCompressionAlgorithm();
Assert.fail("Should not have succeeded");
} catch (IllegalArgumentException e) {
Assert.assertEquals(
"Unsupported BDEC_PARQUET_COMPRESSION_ALGORITHM = 'invalid_comp', allowed values are"
+ " [GZIP]",
e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ private AbstractRowBuffer<?> createTestBuffer(OpenChannelRequest.OnErrorOption o
ClientBufferParameters.test_createClientBufferParameters(
enableParquetMemoryOptimization,
MAX_CHUNK_SIZE_IN_BYTES_DEFAULT,
MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT));
MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT,
Constants.BdecParquetCompression.GZIP));
}

@Test
Expand Down

0 comments on commit d0fd1d8

Please sign in to comment.