From 489307a969c642bd5d7d8adaa2821e9690b6f77a Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Tue, 11 Jun 2024 00:27:31 +0000 Subject: [PATCH] Add parameter requirements --- .../SnowflakeStreamingIngestClientInternal.java | 1 + .../ingest/utils/ParameterProvider.java | 16 ++++++++-------- .../internal/ParameterProviderTest.java | 12 +++++++++++- 3 files changed, 20 insertions(+), 9 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index 86695de8c..13cc673ff 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -257,6 +257,7 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea * @param accountURL Snowflake account url * @param prop connection properties * @param parameterOverrides map of parameters to override for this client + * @param isIcebergMode whether we're streaming to iceberg tables * @param isTestMode indicates whether it's under test mode */ public SnowflakeStreamingIngestClientInternal( diff --git a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java index 31cedcd9b..33f791ca5 100644 --- a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java +++ b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java @@ -1,5 +1,7 @@ package net.snowflake.ingest.utils; +import static net.snowflake.ingest.utils.ErrorCode.INVALID_CONFIG_PARAMETER; + import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -39,8 +41,6 @@ public class ParameterProvider { public static final String BDEC_PARQUET_COMPRESSION_ALGORITHM = "BDEC_PARQUET_COMPRESSION_ALGORITHM".toLowerCase(); - private static final Logging logger = new Logging(ParameterProvider.class); - // Default values public static final long BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS_DEFAULT = 100; public static final long INSERT_THROTTLE_INTERVAL_IN_MILLIS_DEFAULT = 1000; @@ -66,8 +66,9 @@ public class ParameterProvider { public static final Constants.BdecParquetCompression BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT = Constants.BdecParquetCompression.GZIP; - // Iceberg mode parameters - public static final int MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT = 1; + /* Iceberg mode parameters: When streaming to Iceberg mode, different default parameters are required because it generates Parquet files instead of BDEC files. */ + public static final int MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT = + 1; // 1 parquet file per blob /* Parameter that enables using internal Parquet buffers for buffering of rows before serializing. It reduces memory consumption compared to using Java Objects for buffering.*/ @@ -437,11 +438,10 @@ public String toString() { private void icebergModeValidation(String key, Object expected) { Object val = this.parameterMap.get(key); if (!val.equals(expected)) { - logger.logWarn( + throw new SFException( + INVALID_CONFIG_PARAMETER, String.format( - "The value %s for %s is invalid in Iceberg mode, %s will be used instead.", - val, key, expected)); - this.parameterMap.put(key, expected); + "The value %s for %s is invalid in Iceberg mode, should be %s.", val, key, expected)); } } } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java index ea97a05d0..fb73aaa66 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java @@ -8,7 +8,9 @@ import java.util.Map; import java.util.Properties; import net.snowflake.ingest.utils.Constants; +import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.ParameterProvider; +import net.snowflake.ingest.utils.SFException; import org.junit.Assert; import org.junit.Test; @@ -333,9 +335,17 @@ public void testMaxChunksInBlobAndRegistrationRequest() { ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); Assert.assertEquals(1, parameterProvider.getMaxChunksInBlobAndRegistrationRequest()); - parameterMap.put("max_chunks_in_blob_and_registration_request", 100); parameterProvider = new ParameterProvider(parameterMap, prop, true); Assert.assertEquals(1, parameterProvider.getMaxChunksInBlobAndRegistrationRequest()); + + SFException e = + Assert.assertThrows( + SFException.class, + () -> { + parameterMap.put("max_chunks_in_blob_and_registration_request", 100); + new ParameterProvider(parameterMap, prop, true); + }); + Assert.assertEquals(e.getVendorCode(), ErrorCode.INVALID_CONFIG_PARAMETER.getMessageCode()); } @Test