Skip to content

Commit

Permalink
SNOW-1031623 First atte,pt at activating Parquet Version 2
Browse files Browse the repository at this point in the history
Description

Testing
  • Loading branch information
sfc-gh-lthiede committed Jan 31, 2024
1 parent 2659e4d commit e4f906c
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,29 @@ public class ClientBufferParameters {

private Constants.BdecParquetCompression bdecParquetCompression;

private Constants.BdecParquetVersion bdecParquetVersion;

/**
* Private constructor used for test methods
*
* @param enableParquetInternalBuffering flag whether buffering in internal Parquet buffers is
* enabled
* @param maxChunkSizeInBytes maximum chunk size in bytes
* @param maxAllowedRowSizeInBytes maximum row size in bytes
* @param bdecParquetCompression compression algorithm used by parquet
* @param bdecParquetVersion version of parquet used in bdec files
*/
private ClientBufferParameters(
boolean enableParquetInternalBuffering,
long maxChunkSizeInBytes,
long maxAllowedRowSizeInBytes,
Constants.BdecParquetCompression bdecParquetCompression) {
Constants.BdecParquetCompression bdecParquetCompression,
Constants.BdecParquetVersion bdecParquetVersion) {
this.enableParquetInternalBuffering = enableParquetInternalBuffering;
this.maxChunkSizeInBytes = maxChunkSizeInBytes;
this.maxAllowedRowSizeInBytes = maxAllowedRowSizeInBytes;
this.bdecParquetCompression = bdecParquetCompression;
this.bdecParquetVersion = bdecParquetVersion;
}

/** @param clientInternal reference to the client object where the relevant parameters are set */
Expand All @@ -55,25 +61,33 @@ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInter
clientInternal != null
? clientInternal.getParameterProvider().getBdecParquetCompressionAlgorithm()
: ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT;
this.bdecParquetVersion =
clientInternal != null
? clientInternal.getParameterProvider().getBdecParquetVersion()
: ParameterProvider.BDEC_PARQUET_VERSION_DEFAULT;
}

/**
* @param enableParquetInternalBuffering flag whether buffering in internal Parquet buffers is
* enabled
* @param maxChunkSizeInBytes maximum chunk size in bytes
* @param maxAllowedRowSizeInBytes maximum row size in bytes
* @param bdecParquetCompression compression algorithm used by parquet
* @param bdecParquetVersion version of parquet used in bdec files
* @return ClientBufferParameters object
*/
public static ClientBufferParameters test_createClientBufferParameters(
boolean enableParquetInternalBuffering,
long maxChunkSizeInBytes,
long maxAllowedRowSizeInBytes,
Constants.BdecParquetCompression bdecParquetCompression) {
Constants.BdecParquetCompression bdecParquetCompression,
Constants.BdecParquetVersion bdecParquetVersion) {
return new ClientBufferParameters(
enableParquetInternalBuffering,
maxChunkSizeInBytes,
maxAllowedRowSizeInBytes,
bdecParquetCompression);
bdecParquetCompression,
bdecParquetVersion);
}

public boolean getEnableParquetInternalBuffering() {
Expand All @@ -91,4 +105,8 @@ public long getMaxAllowedRowSizeInBytes() {
public Constants.BdecParquetCompression getBdecParquetCompression() {
return bdecParquetCompression;
}

public Constants.BdecParquetVersion getBdecParquetVersion() {
return bdecParquetVersion;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public class ParquetFlusher implements Flusher<ParquetChunkData> {

private final Constants.BdecParquetCompression bdecParquetCompression;

private final Constants.BdecParquetVersion bdecParquetVersion;

/**
* Construct parquet flusher from its schema and set flag that indicates whether Parquet memory
* optimization is enabled, i.e. rows will be buffered in internal Parquet buffer.
Expand All @@ -38,11 +40,13 @@ public ParquetFlusher(
MessageType schema,
boolean enableParquetInternalBuffering,
long maxChunkSizeInBytes,
Constants.BdecParquetCompression bdecParquetCompression) {
Constants.BdecParquetCompression bdecParquetCompression,
Constants.BdecParquetVersion bdecParquetVersion) {
this.schema = schema;
this.enableParquetInternalBuffering = enableParquetInternalBuffering;
this.maxChunkSizeInBytes = maxChunkSizeInBytes;
this.bdecParquetCompression = bdecParquetCompression;
this.bdecParquetVersion = bdecParquetVersion;
}

@Override
Expand Down Expand Up @@ -210,7 +214,8 @@ private SerializationResult serializeFromJavaObjects(
metadata,
firstChannelFullyQualifiedTableName,
maxChunkSizeInBytes,
bdecParquetCompression);
bdecParquetCompression,
bdecParquetVersion);
rows.forEach(parquetWriter::writeRow);
parquetWriter.close();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ private void createFileWriter() {
metadata,
channelName,
clientBufferParameters.getMaxChunkSizeInBytes(),
clientBufferParameters.getBdecParquetCompression());
clientBufferParameters.getBdecParquetCompression(),
clientBufferParameters.getBdecParquetVersion());
} else {
this.bdecParquetWriter = null;
}
Expand Down Expand Up @@ -325,7 +326,8 @@ public Flusher<ParquetChunkData> createFlusher() {
schema,
clientBufferParameters.getEnableParquetInternalBuffering(),
clientBufferParameters.getMaxChunkSizeInBytes(),
clientBufferParameters.getBdecParquetCompression());
clientBufferParameters.getBdecParquetCompression(),
clientBufferParameters.getBdecParquetVersion());
}

private static class ParquetColumn {
Expand Down
43 changes: 43 additions & 0 deletions src/main/java/net/snowflake/ingest/utils/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@
package net.snowflake.ingest.utils;

import java.util.Arrays;

import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.values.factory.DefaultV1ValuesWriterFactory;
import org.apache.parquet.column.values.factory.DefaultV2ValuesWriterFactory;
import org.apache.parquet.column.values.factory.ValuesWriterFactory;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

/** Contains all the constants needed for Streaming Ingest */
Expand Down Expand Up @@ -137,6 +142,44 @@ public static BdecParquetCompression fromName(String name) {
name, Arrays.asList(BdecParquetCompression.values())));
}
}

/**
* Version of Parquet. It is a wrapper around Parquet's lib ParquetProperties.WriterVersion.
* We want to control the available options even though we currently support all existing options
*/
public enum BdecParquetVersion {
PARQUET_1_0,
PARQUET_2_0;

public ParquetProperties.WriterVersion getWriterVersion() {
return ParquetProperties.WriterVersion.valueOf(this.name());
}

public ValuesWriterFactory getValuesWriterFactory() {
if (this == PARQUET_1_0) {
return new DefaultV1ValuesWriterFactory();
} else if (this == PARQUET_2_0) {
return new DefaultV2ValuesWriterFactory();
} else {
throw new IllegalArgumentException(
String.format(
"Unsupported BDEC_PARQUET_VERSION = '%s', allowed values are \"PARQUET_1_0\", \"PARQUET_2_0\"",
this.name()));
}
}

public static BdecParquetVersion fromName(String name) {
for (BdecParquetVersion v : BdecParquetVersion.values()) {
if (v.name().toLowerCase().equals(name.toLowerCase())) {
return v;
}
}
throw new IllegalArgumentException(
String.format(
"Unsupported BDEC_PARQUET_VERSION = '%s', allowed values are %s",
name, Arrays.asList(BdecParquetVersion.values())));
}
}
// Parameters
public static final boolean DISABLE_BACKGROUND_FLUSH = false;
public static final boolean COMPRESS_BLOB_TWICE = false;
Expand Down
22 changes: 22 additions & 0 deletions src/main/java/net/snowflake/ingest/utils/ParameterProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public class ParameterProvider {
public static final String BDEC_PARQUET_COMPRESSION_ALGORITHM =
"BDEC_PARQUET_COMPRESSION_ALGORITHM".toLowerCase();

public static final String BDEC_PARQUET_VERSION = "BDEC_PARQUET_VERSION".toLowerCase();

// Default values
public static final long BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS_DEFAULT = 100;
public static final long INSERT_THROTTLE_INTERVAL_IN_MILLIS_DEFAULT = 1000;
Expand All @@ -64,6 +66,9 @@ public class ParameterProvider {
public static final Constants.BdecParquetCompression BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT =
Constants.BdecParquetCompression.GZIP;

public static final Constants.BdecParquetVersion BDEC_PARQUET_VERSION_DEFAULT =
Constants.BdecParquetVersion.PARQUET_2_0;

/* Parameter that enables using internal Parquet buffers for buffering of rows before serializing.
It reduces memory consumption compared to using Java Objects for buffering.*/
public static final boolean ENABLE_PARQUET_INTERNAL_BUFFERING_DEFAULT = false;
Expand Down Expand Up @@ -188,6 +193,12 @@ private void setParameterMap(Map<String, Object> parameterOverrides, Properties
BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT,
parameterOverrides,
props);

this.updateValue(
BDEC_PARQUET_VERSION,
BDEC_PARQUET_VERSION_DEFAULT,
parameterOverrides,
props);
}

/** @return Longest interval in milliseconds between buffer flushes */
Expand Down Expand Up @@ -407,6 +418,17 @@ public Constants.BdecParquetCompression getBdecParquetCompressionAlgorithm() {
return Constants.BdecParquetCompression.fromName((String) val);
}

/** @return BDEC parquet version */
public Constants.BdecParquetVersion getBdecParquetVersion() {
Object val =
this.parameterMap.getOrDefault(
BDEC_PARQUET_VERSION, BDEC_PARQUET_VERSION_DEFAULT);
if (val instanceof Constants.BdecParquetVersion) {
return (Constants.BdecParquetVersion) val;
}
return Constants.BdecParquetVersion.fromName((String) val);
}

@Override
public String toString() {
return "ParameterProvider{" + "parameterMap=" + parameterMap + '}';
Expand Down
11 changes: 6 additions & 5 deletions src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,11 @@ public BdecParquetWriter(
Map<String, String> extraMetaData,
String channelName,
long maxChunkSizeInBytes,
Constants.BdecParquetCompression bdecParquetCompression)
Constants.BdecParquetCompression bdecParquetCompression,
Constants.BdecParquetVersion bdecParquetVersion)
throws IOException {
OutputFile file = new ByteArrayOutputFile(stream, maxChunkSizeInBytes);
ParquetProperties encodingProps = createParquetProperties();
ParquetProperties encodingProps = createParquetProperties(bdecParquetVersion);
Configuration conf = new Configuration();
WriteSupport<List<Object>> writeSupport =
new BdecWriteSupport(schema, extraMetaData, channelName);
Expand Down Expand Up @@ -119,7 +120,7 @@ public void close() throws IOException {
}
}

private static ParquetProperties createParquetProperties() {
private static ParquetProperties createParquetProperties(Constants.BdecParquetVersion bdecParquetVersion) {
/**
* There are two main limitations on the server side that we have to overcome by tweaking
* Parquet limits:
Expand Down Expand Up @@ -147,8 +148,8 @@ private static ParquetProperties createParquetProperties() {
return ParquetProperties.builder()
// PARQUET_2_0 uses Encoding.DELTA_BYTE_ARRAY for byte arrays (e.g. SF sb16)
// server side does not support it TODO: SNOW-657238
.withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
.withValuesWriterFactory(new DefaultV1ValuesWriterFactory())
.withWriterVersion(bdecParquetVersion.getWriterVersion())
.withValuesWriterFactory(bdecParquetVersion.getValuesWriterFactory())
// the dictionary encoding (Encoding.*_DICTIONARY) is not supported by server side
// scanner yet
.withDictionaryEncoding(false)
Expand Down

0 comments on commit e4f906c

Please sign in to comment.