Skip to content

Commit

Permalink
Add parameter requirements
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-alhuang committed Jun 11, 2024
1 parent 022a2d8 commit 489307a
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ public class SnowflakeStreamingIngestClientInternal<T> implements SnowflakeStrea
* @param accountURL Snowflake account url
* @param prop connection properties
* @param parameterOverrides map of parameters to override for this client
* @param isIcebergMode whether we're streaming to iceberg tables
* @param isTestMode indicates whether it's under test mode
*/
public SnowflakeStreamingIngestClientInternal(
Expand Down
16 changes: 8 additions & 8 deletions src/main/java/net/snowflake/ingest/utils/ParameterProvider.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package net.snowflake.ingest.utils;

import static net.snowflake.ingest.utils.ErrorCode.INVALID_CONFIG_PARAMETER;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
Expand Down Expand Up @@ -39,8 +41,6 @@ public class ParameterProvider {
public static final String BDEC_PARQUET_COMPRESSION_ALGORITHM =
"BDEC_PARQUET_COMPRESSION_ALGORITHM".toLowerCase();

private static final Logging logger = new Logging(ParameterProvider.class);

// Default values
public static final long BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS_DEFAULT = 100;
public static final long INSERT_THROTTLE_INTERVAL_IN_MILLIS_DEFAULT = 1000;
Expand All @@ -66,8 +66,9 @@ public class ParameterProvider {
public static final Constants.BdecParquetCompression BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT =
Constants.BdecParquetCompression.GZIP;

// Iceberg mode parameters
public static final int MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT = 1;
/* Iceberg mode parameters: When streaming to Iceberg mode, different default parameters are required because it generates Parquet files instead of BDEC files. */
public static final int MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT =
1; // 1 parquet file per blob

/* Parameter that enables using internal Parquet buffers for buffering of rows before serializing.
It reduces memory consumption compared to using Java Objects for buffering.*/
Expand Down Expand Up @@ -437,11 +438,10 @@ public String toString() {
private void icebergModeValidation(String key, Object expected) {
Object val = this.parameterMap.get(key);
if (!val.equals(expected)) {
logger.logWarn(
throw new SFException(
INVALID_CONFIG_PARAMETER,
String.format(
"The value %s for %s is invalid in Iceberg mode, %s will be used instead.",
val, key, expected));
this.parameterMap.put(key, expected);
"The value %s for %s is invalid in Iceberg mode, should be %s.", val, key, expected));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import java.util.Map;
import java.util.Properties;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.ParameterProvider;
import net.snowflake.ingest.utils.SFException;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -333,9 +335,17 @@ public void testMaxChunksInBlobAndRegistrationRequest() {
ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false);
Assert.assertEquals(1, parameterProvider.getMaxChunksInBlobAndRegistrationRequest());

parameterMap.put("max_chunks_in_blob_and_registration_request", 100);
parameterProvider = new ParameterProvider(parameterMap, prop, true);
Assert.assertEquals(1, parameterProvider.getMaxChunksInBlobAndRegistrationRequest());

SFException e =
Assert.assertThrows(
SFException.class,
() -> {
parameterMap.put("max_chunks_in_blob_and_registration_request", 100);
new ParameterProvider(parameterMap, prop, true);
});
Assert.assertEquals(e.getVendorCode(), ErrorCode.INVALID_CONFIG_PARAMETER.getMessageCode());
}

@Test
Expand Down

0 comments on commit 489307a

Please sign in to comment.