Skip to content

Commit

Permalink
SNOW-1764301 Iceberg parquet file configuration (#874)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-alhuang authored Oct 29, 2024
1 parent 59b686d commit 70d4aa9
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.Optional;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ParameterProvider;
import org.apache.parquet.column.ParquetProperties;

/** Channel's buffer relevant parameters that are set at the owning client level. */
public class ClientBufferParameters {
Expand All @@ -29,6 +30,8 @@ public class ClientBufferParameters {

private boolean enableValuesCount;

private boolean enableDictionaryEncoding;

/**
* Private constructor used for test methods
*
Expand All @@ -53,10 +56,13 @@ private ClientBufferParameters(
this.enableIcebergStreaming = enableIcebergStreaming;
this.enableDistinctValuesCount = enableDistinctValuesCount;
this.enableValuesCount = enableValuesCount;
this.enableDictionaryEncoding = enableIcebergStreaming;
}

/** @param clientInternal reference to the client object where the relevant parameters are set */
public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInternal) {
public ClientBufferParameters(
SnowflakeStreamingIngestClientInternal clientInternal,
ParquetProperties.WriterVersion parquetWriterVersion) {
this.maxChunkSizeInBytes =
clientInternal != null
? clientInternal.getParameterProvider().getMaxChunkSizeInBytes()
Expand Down Expand Up @@ -89,6 +95,9 @@ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInter
clientInternal != null
? clientInternal.getInternalParameterProvider().isEnableValuesCount()
: InternalParameterProvider.ENABLE_VALUES_COUNT_DEFAULT;
this.enableDictionaryEncoding =
enableIcebergStreaming
&& parquetWriterVersion == ParquetProperties.WriterVersion.PARQUET_2_0;
}

/**
Expand Down Expand Up @@ -154,6 +163,6 @@ public boolean isEnableValuesCount() {
}

public boolean isEnableDictionaryEncoding() {
return enableIcebergStreaming;
return enableDictionaryEncoding;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class SnowflakeStreamingIngestChannelInternal<T> implements SnowflakeStreamingIn
getFullyQualifiedName(),
this::collectRowSize,
channelState,
new ClientBufferParameters(owningClient),
new ClientBufferParameters(owningClient, parquetWriterVersion),
offsetTokenVerificationFunction,
parquetWriterVersion,
owningClient.getTelemetryService());
Expand Down
12 changes: 10 additions & 2 deletions src/main/java/net/snowflake/ingest/utils/ParameterProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ public class ParameterProvider {
public static final Constants.BdecParquetCompression BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT =
Constants.BdecParquetCompression.GZIP;

public static final Constants.BdecParquetCompression
ICEBERG_PARQUET_COMPRESSION_ALGORITHM_DEFAULT = Constants.BdecParquetCompression.ZSTD;

/* Iceberg mode parameters: When streaming to Iceberg mode, different default parameters are required because it generates Parquet files instead of BDEC files. */
public static final int MAX_CHUNKS_IN_BLOB_ICEBERG_MODE_DEFAULT = 1;

Expand Down Expand Up @@ -253,7 +256,9 @@ private void setParameterMap(Map<String, Object> parameterOverrides, Properties

this.checkAndUpdate(
BDEC_PARQUET_COMPRESSION_ALGORITHM,
BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT,
isEnableIcebergStreaming()
? ICEBERG_PARQUET_COMPRESSION_ALGORITHM_DEFAULT
: BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT,
parameterOverrides,
props,
false /* enforceDefault */);
Expand Down Expand Up @@ -488,7 +493,10 @@ public int getMaxChunksInRegistrationRequest() {
public Constants.BdecParquetCompression getBdecParquetCompressionAlgorithm() {
Object val =
this.parameterMap.getOrDefault(
BDEC_PARQUET_COMPRESSION_ALGORITHM, BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT);
BDEC_PARQUET_COMPRESSION_ALGORITHM,
isEnableIcebergStreaming()
? ICEBERG_PARQUET_COMPRESSION_ALGORITHM_DEFAULT
: BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT);
if (val instanceof Constants.BdecParquetCompression) {
return (Constants.BdecParquetCompression) val;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,9 @@ public void withDefaultValues() {
ParameterProvider.MAX_CHANNEL_SIZE_IN_BYTES_DEFAULT,
parameterProvider.getMaxChannelSizeInBytes());
Assert.assertEquals(
ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT,
enableIcebergStreaming
? ParameterProvider.ICEBERG_PARQUET_COMPRESSION_ALGORITHM_DEFAULT
: ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT,
parameterProvider.getBdecParquetCompressionAlgorithm());
Assert.assertEquals(
enableIcebergStreaming
Expand Down

0 comments on commit 70d4aa9

Please sign in to comment.