From 70d4aa9b4b3753201f9cf948ecbe591ecab7cd91 Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Tue, 29 Oct 2024 15:44:46 -0700 Subject: [PATCH] SNOW-1764301 Iceberg parquet file configuration (#874) --- .../streaming/internal/ClientBufferParameters.java | 13 +++++++++++-- .../SnowflakeStreamingIngestChannelInternal.java | 2 +- .../snowflake/ingest/utils/ParameterProvider.java | 12 ++++++++++-- .../streaming/internal/ParameterProviderTest.java | 4 +++- 4 files changed, 25 insertions(+), 6 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java b/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java index 6a15beba3..d1a93f9b3 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java @@ -7,6 +7,7 @@ import java.util.Optional; import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ParameterProvider; +import org.apache.parquet.column.ParquetProperties; /** Channel's buffer relevant parameters that are set at the owning client level. */ public class ClientBufferParameters { @@ -29,6 +30,8 @@ public class ClientBufferParameters { private boolean enableValuesCount; + private boolean enableDictionaryEncoding; + /** * Private constructor used for test methods * @@ -53,10 +56,13 @@ private ClientBufferParameters( this.enableIcebergStreaming = enableIcebergStreaming; this.enableDistinctValuesCount = enableDistinctValuesCount; this.enableValuesCount = enableValuesCount; + this.enableDictionaryEncoding = enableIcebergStreaming; } /** @param clientInternal reference to the client object where the relevant parameters are set */ - public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInternal) { + public ClientBufferParameters( + SnowflakeStreamingIngestClientInternal clientInternal, + ParquetProperties.WriterVersion parquetWriterVersion) { this.maxChunkSizeInBytes = clientInternal != null ? clientInternal.getParameterProvider().getMaxChunkSizeInBytes() @@ -89,6 +95,9 @@ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInter clientInternal != null ? clientInternal.getInternalParameterProvider().isEnableValuesCount() : InternalParameterProvider.ENABLE_VALUES_COUNT_DEFAULT; + this.enableDictionaryEncoding = + enableIcebergStreaming + && parquetWriterVersion == ParquetProperties.WriterVersion.PARQUET_2_0; } /** @@ -154,6 +163,6 @@ public boolean isEnableValuesCount() { } public boolean isEnableDictionaryEncoding() { - return enableIcebergStreaming; + return enableDictionaryEncoding; } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java index 167240ed7..ee06eedcc 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java @@ -109,7 +109,7 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingIn getFullyQualifiedName(), this::collectRowSize, channelState, - new ClientBufferParameters(owningClient), + new ClientBufferParameters(owningClient, parquetWriterVersion), offsetTokenVerificationFunction, parquetWriterVersion, owningClient.getTelemetryService()); diff --git a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java index 81b17ddcf..8937ccd3c 100644 --- a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java +++ b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java @@ -76,6 +76,9 @@ public class ParameterProvider { public static final Constants.BdecParquetCompression BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT = Constants.BdecParquetCompression.GZIP; + public static final Constants.BdecParquetCompression + ICEBERG_PARQUET_COMPRESSION_ALGORITHM_DEFAULT = Constants.BdecParquetCompression.ZSTD; + /* Iceberg mode parameters: When streaming to Iceberg mode, different default parameters are required because it generates Parquet files instead of BDEC files. */ public static final int MAX_CHUNKS_IN_BLOB_ICEBERG_MODE_DEFAULT = 1; @@ -253,7 +256,9 @@ private void setParameterMap(Map parameterOverrides, Properties this.checkAndUpdate( BDEC_PARQUET_COMPRESSION_ALGORITHM, - BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT, + isEnableIcebergStreaming() + ? ICEBERG_PARQUET_COMPRESSION_ALGORITHM_DEFAULT + : BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT, parameterOverrides, props, false /* enforceDefault */); @@ -488,7 +493,10 @@ public int getMaxChunksInRegistrationRequest() { public Constants.BdecParquetCompression getBdecParquetCompressionAlgorithm() { Object val = this.parameterMap.getOrDefault( - BDEC_PARQUET_COMPRESSION_ALGORITHM, BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT); + BDEC_PARQUET_COMPRESSION_ALGORITHM, + isEnableIcebergStreaming() + ? ICEBERG_PARQUET_COMPRESSION_ALGORITHM_DEFAULT + : BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT); if (val instanceof Constants.BdecParquetCompression) { return (Constants.BdecParquetCompression) val; } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java index 3d7b36291..ea4a92f6f 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java @@ -158,7 +158,9 @@ public void withDefaultValues() { ParameterProvider.MAX_CHANNEL_SIZE_IN_BYTES_DEFAULT, parameterProvider.getMaxChannelSizeInBytes()); Assert.assertEquals( - ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT, + enableIcebergStreaming + ? ParameterProvider.ICEBERG_PARQUET_COMPRESSION_ALGORITHM_DEFAULT + : ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT, parameterProvider.getBdecParquetCompressionAlgorithm()); Assert.assertEquals( enableIcebergStreaming