From 422ac865f6a8cb40f381df069ea594e2d08b756c Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Mon, 28 Oct 2024 13:25:11 -0700 Subject: [PATCH] address comments --- .../ingest/utils/ParameterProvider.java | 31 ++++--- .../java/net/snowflake/ingest/TestUtils.java | 24 +++++ .../streaming/internal/FlushServiceTest.java | 5 +- .../internal/ParameterProviderTest.java | 92 +++++++++++++------ .../SnowflakeStreamingIngestChannelTest.java | 17 ++-- 5 files changed, 118 insertions(+), 51 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java index 734788df6..32f841e4f 100644 --- a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java +++ b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java @@ -89,6 +89,9 @@ public class ParameterProvider { // Cached buffer flush interval - avoid parsing each time for quick lookup private Long cachedBufferFlushIntervalMs = -1L; + // Cached isIcebergMode - avoid parsing each time for quick lookup + private Boolean cachedIsIcebergMode = null; + /** * Constructor. Takes properties from profile file and properties from client constructor and * resolves final parameter value @@ -100,20 +103,6 @@ public ParameterProvider(Map parameterOverrides, Properties prop this.setParameterMap(parameterOverrides, props); } - /** Constructor for tests */ - public ParameterProvider( - Map parameterOverrides, Properties props, boolean isIcebergMode) { - if (parameterOverrides != null) { - parameterOverrides.put(STREAMING_ICEBERG, isIcebergMode); - } - this.setParameterMap(parameterOverrides, props); - } - - /** Empty constructor for tests */ - public ParameterProvider(boolean isIcebergMode) { - this(new HashMap<>(), null, isIcebergMode); - } - private void checkAndUpdate( String key, Object defaultValue, @@ -153,6 +142,7 @@ private void setParameterMap(Map parameterOverrides, Properties BUFFER_FLUSH_INTERVAL_IN_MILLIS, MAX_CLIENT_LAG)); } + /* STREAMING_ICEBERG should be the first thing to set as it affects other parameters */ this.checkAndUpdate( STREAMING_ICEBERG, STREAMING_ICEBERG_DEFAULT, @@ -509,8 +499,19 @@ public boolean isEnableNewJsonParsingLogic() { /** @return Whether the client is in Iceberg mode */ public boolean isIcebergMode() { + if (cachedIsIcebergMode != null) { + return cachedIsIcebergMode; + } Object val = this.parameterMap.getOrDefault(STREAMING_ICEBERG, STREAMING_ICEBERG_DEFAULT); - return (val instanceof String) ? Boolean.parseBoolean(val.toString()) : (boolean) val; + + try { + cachedIsIcebergMode = + (val instanceof String) ? Boolean.parseBoolean(val.toString()) : (boolean) val; + } catch (Throwable t) { + throw new IllegalArgumentException( + String.format("Failed to parse STREAMING_ICEBERG = '%s'", val), t); + } + return cachedIsIcebergMode; } @Override diff --git a/src/test/java/net/snowflake/ingest/TestUtils.java b/src/test/java/net/snowflake/ingest/TestUtils.java index 407cba0bc..de07d50b7 100644 --- a/src/test/java/net/snowflake/ingest/TestUtils.java +++ b/src/test/java/net/snowflake/ingest/TestUtils.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest; import static net.snowflake.ingest.utils.Constants.ACCOUNT; @@ -14,6 +18,7 @@ import static net.snowflake.ingest.utils.Constants.USER; import static net.snowflake.ingest.utils.Constants.WAREHOUSE; import static net.snowflake.ingest.utils.ParameterProvider.BLOB_FORMAT_VERSION; +import static net.snowflake.ingest.utils.ParameterProvider.STREAMING_ICEBERG; import java.io.IOException; import java.math.BigDecimal; @@ -46,6 +51,7 @@ import net.snowflake.ingest.streaming.InsertValidationResponse; import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel; import net.snowflake.ingest.utils.Constants; +import net.snowflake.ingest.utils.ParameterProvider; import net.snowflake.ingest.utils.Utils; import org.apache.commons.codec.binary.Base64; import org.junit.Assert; @@ -497,6 +503,24 @@ public static URI getTokenRequestURI() { return tokenRequestURI; } + public static ParameterProvider createParameterProvider( + Map parameterOverrides, Properties props, boolean isIcebergMode) { + if (parameterOverrides != null) { + parameterOverrides.put(STREAMING_ICEBERG, isIcebergMode); + } + return new ParameterProvider(parameterOverrides, props); + } + + public static ParameterProvider createParameterProvider(boolean isIcebergMode) { + return createParameterProvider(new HashMap<>(), null, isIcebergMode); + } + + public static Properties createProps(boolean isIcebergMode) { + Properties prop = new Properties(); + prop.setProperty(ParameterProvider.STREAMING_ICEBERG, String.valueOf(isIcebergMode)); + return prop; + } + private static T nullOrIfNullable(boolean nullable, Random r, Supplier value) { return !nullable ? value.get() : (r.nextBoolean() ? value.get() : null); } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index ad8cda3fe..56b0ebc61 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -4,6 +4,7 @@ package net.snowflake.ingest.streaming.internal; +import static net.snowflake.ingest.TestUtils.createParameterProvider; import static net.snowflake.ingest.utils.Constants.BLOB_CHECKSUM_SIZE_IN_BYTES; import static net.snowflake.ingest.utils.Constants.BLOB_CHUNK_METADATA_LENGTH_SIZE_IN_BYTES; import static net.snowflake.ingest.utils.Constants.BLOB_EXTENSION_TYPE; @@ -107,7 +108,7 @@ private abstract static class TestContext implements AutoCloseable { TestContext() { storage = Mockito.mock(InternalStage.class); - parameterProvider = new ParameterProvider(isIcebergMode); + parameterProvider = createParameterProvider(isIcebergMode); InternalParameterProvider internalParameterProvider = new InternalParameterProvider(isIcebergMode); client = Mockito.mock(SnowflakeStreamingIngestClientInternal.class); @@ -1049,7 +1050,7 @@ public void testInvalidateChannels() { // Create a new Client in order to not interfere with other tests SnowflakeStreamingIngestClientInternal client = Mockito.mock(SnowflakeStreamingIngestClientInternal.class); - ParameterProvider parameterProvider = new ParameterProvider(isIcebergMode); + ParameterProvider parameterProvider = createParameterProvider(isIcebergMode); ChannelCache channelCache = new ChannelCache<>(); InternalParameterProvider internalParameterProvider = new InternalParameterProvider(isIcebergMode); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java index d0cbbb940..61022bde6 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java @@ -9,10 +9,12 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import net.snowflake.ingest.TestUtils; import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.ParameterProvider; import net.snowflake.ingest.utils.SFException; +import org.assertj.core.api.Assertions; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -47,7 +49,8 @@ private Map getStartingParameterMap() { public void withValuesSet() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); + ParameterProvider parameterProvider = + TestUtils.createParameterProvider(parameterMap, prop, isIcebergMode); Assert.assertEquals(1000L, parameterProvider.getCachedMaxClientLagInMs()); Assert.assertEquals(4L, parameterProvider.getBufferFlushCheckIntervalInMs()); @@ -70,7 +73,8 @@ public void withNullProps() { parameterMap.put(ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS, 4L); parameterMap.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE, 6); parameterMap.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_BYTES, 1024); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, null, isIcebergMode); + ParameterProvider parameterProvider = + TestUtils.createParameterProvider(parameterMap, null, isIcebergMode); Assert.assertEquals(3000, parameterProvider.getCachedMaxClientLagInMs()); Assert.assertEquals(4, parameterProvider.getBufferFlushCheckIntervalInMs()); @@ -88,7 +92,8 @@ public void withNullParameterMap() { props.put(ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS, 4L); props.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE, 6); props.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_BYTES, 1024); - ParameterProvider parameterProvider = new ParameterProvider(null, props, isIcebergMode); + ParameterProvider parameterProvider = + TestUtils.createParameterProvider(null, props, isIcebergMode); Assert.assertEquals(3000, parameterProvider.getCachedMaxClientLagInMs()); Assert.assertEquals(4, parameterProvider.getBufferFlushCheckIntervalInMs()); @@ -101,7 +106,8 @@ public void withNullParameterMap() { @Test public void withNullInputs() { - ParameterProvider parameterProvider = new ParameterProvider(null, null, isIcebergMode); + ParameterProvider parameterProvider = + TestUtils.createParameterProvider(null, null, isIcebergMode); Assert.assertEquals( ParameterProvider.MAX_CLIENT_LAG_DEFAULT, parameterProvider.getCachedMaxClientLagInMs()); @@ -121,7 +127,7 @@ public void withNullInputs() { @Test public void withDefaultValues() { - ParameterProvider parameterProvider = new ParameterProvider(isIcebergMode); + ParameterProvider parameterProvider = TestUtils.createParameterProvider(isIcebergMode); Assert.assertEquals( isIcebergMode @@ -164,12 +170,27 @@ public void withDefaultValues() { parameterProvider.getMaxChunksInRegistrationRequest()); } + @Test + public void testEnforceDefaultValues() { + if (!isIcebergMode) { + return; + } + Map parameterMap = new HashMap<>(); + parameterMap.put(ParameterProvider.MAX_CHUNKS_IN_BLOB, 2); + Assertions.assertThatThrownBy( + () -> TestUtils.createParameterProvider(parameterMap, null, isIcebergMode)) + .isInstanceOf(SFException.class) + .extracting("vendorCode") + .isEqualTo(ErrorCode.INVALID_CONFIG_PARAMETER.getMessageCode()); + } + @Test public void testMaxClientLagEnabled() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "2 second"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); + ParameterProvider parameterProvider = + TestUtils.createParameterProvider(parameterMap, prop, isIcebergMode); Assert.assertEquals(2000, parameterProvider.getCachedMaxClientLagInMs()); // call again to trigger caching logic Assert.assertEquals(2000, parameterProvider.getCachedMaxClientLagInMs()); @@ -180,7 +201,8 @@ public void testMaxClientLagEnabledPluralTimeUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "2 seconds"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); + ParameterProvider parameterProvider = + TestUtils.createParameterProvider(parameterMap, prop, isIcebergMode); Assert.assertEquals(2000, parameterProvider.getCachedMaxClientLagInMs()); } @@ -189,7 +211,8 @@ public void testMaxClientLagEnabledMinuteTimeUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "1 minute"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); + ParameterProvider parameterProvider = + TestUtils.createParameterProvider(parameterMap, prop, isIcebergMode); Assert.assertEquals(60000, parameterProvider.getCachedMaxClientLagInMs()); } @@ -198,7 +221,8 @@ public void testMaxClientLagEnabledMinuteTimeUnitPluralTimeUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "2 minutes"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); + ParameterProvider parameterProvider = + TestUtils.createParameterProvider(parameterMap, prop, isIcebergMode); Assert.assertEquals(120000, parameterProvider.getCachedMaxClientLagInMs()); } @@ -206,7 +230,8 @@ public void testMaxClientLagEnabledMinuteTimeUnitPluralTimeUnit() { public void testMaxClientLagEnabledDefaultValue() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); + ParameterProvider parameterProvider = + TestUtils.createParameterProvider(parameterMap, prop, isIcebergMode); Assert.assertEquals( ParameterProvider.MAX_CLIENT_LAG_DEFAULT, parameterProvider.getCachedMaxClientLagInMs()); } @@ -216,7 +241,8 @@ public void testMaxClientLagEnabledDefaultUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "3000"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); + ParameterProvider parameterProvider = + TestUtils.createParameterProvider(parameterMap, prop, isIcebergMode); Assert.assertEquals(3000, parameterProvider.getCachedMaxClientLagInMs()); } @@ -225,7 +251,8 @@ public void testMaxClientLagEnabledLongInput() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, 3000L); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); + ParameterProvider parameterProvider = + TestUtils.createParameterProvider(parameterMap, prop, isIcebergMode); Assert.assertEquals(3000, parameterProvider.getCachedMaxClientLagInMs()); } @@ -234,7 +261,8 @@ public void testMaxClientLagEnabledMissingUnitTimeUnitSupplied() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, " year"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); + ParameterProvider parameterProvider = + TestUtils.createParameterProvider(parameterMap, prop, isIcebergMode); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -248,7 +276,8 @@ public void testMaxClientLagEnabledInvalidTimeUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "1 year"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); + ParameterProvider parameterProvider = + TestUtils.createParameterProvider(parameterMap, prop, isIcebergMode); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -262,7 +291,8 @@ public void testMaxClientLagEnabledInvalidUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "banana minute"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); + ParameterProvider parameterProvider = + TestUtils.createParameterProvider(parameterMap, prop, isIcebergMode); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -276,7 +306,8 @@ public void testMaxClientLagEnabledThresholdBelow() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "0 second"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); + ParameterProvider parameterProvider = + TestUtils.createParameterProvider(parameterMap, prop, isIcebergMode); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -290,7 +321,8 @@ public void testMaxClientLagEnabledThresholdAbove() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "11 minutes"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); + ParameterProvider parameterProvider = + TestUtils.createParameterProvider(parameterMap, prop, isIcebergMode); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -304,7 +336,8 @@ public void testMaxClientLagEnableEmptyInput() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, ""); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); + ParameterProvider parameterProvider = + TestUtils.createParameterProvider(parameterMap, prop, isIcebergMode); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -318,7 +351,8 @@ public void testMaxChunksInBlob() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put("max_chunks_in_blob", 1); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); + ParameterProvider parameterProvider = + TestUtils.createParameterProvider(parameterMap, prop, isIcebergMode); Assert.assertEquals(1, parameterProvider.getMaxChunksInBlob()); if (isIcebergMode) { @@ -327,7 +361,7 @@ public void testMaxChunksInBlob() { SFException.class, () -> { parameterMap.put("max_chunks_in_blob", 100); - new ParameterProvider(parameterMap, prop, isIcebergMode); + TestUtils.createParameterProvider(parameterMap, prop, isIcebergMode); }); Assert.assertEquals(e.getVendorCode(), ErrorCode.INVALID_CONFIG_PARAMETER.getMessageCode()); } @@ -338,7 +372,8 @@ public void testMaxChunksInRegistrationRequest() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put("max_chunks_in_registration_request", 101); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); + ParameterProvider parameterProvider = + TestUtils.createParameterProvider(parameterMap, prop, isIcebergMode); Assert.assertEquals(101, parameterProvider.getMaxChunksInRegistrationRequest()); IllegalArgumentException e = @@ -346,7 +381,7 @@ public void testMaxChunksInRegistrationRequest() { IllegalArgumentException.class, () -> { parameterMap.put("max_chunks_in_registration_request", 0); - new ParameterProvider(parameterMap, prop, isIcebergMode); + TestUtils.createParameterProvider(parameterMap, prop, isIcebergMode); }); Assert.assertEquals( e.getMessage(), @@ -365,7 +400,7 @@ public void testValidCompressionAlgorithmsAndWithUppercaseLowerCase() { Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM, v); ParameterProvider parameterProvider = - new ParameterProvider(parameterMap, prop, isIcebergMode); + TestUtils.createParameterProvider(parameterMap, prop, isIcebergMode); Assert.assertEquals( Constants.BdecParquetCompression.GZIP, parameterProvider.getBdecParquetCompressionAlgorithm()); @@ -377,7 +412,7 @@ public void testValidCompressionAlgorithmsAndWithUppercaseLowerCase() { Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM, v); ParameterProvider parameterProvider = - new ParameterProvider(parameterMap, prop, isIcebergMode); + TestUtils.createParameterProvider(parameterMap, prop, isIcebergMode); Assert.assertEquals( Constants.BdecParquetCompression.ZSTD, parameterProvider.getBdecParquetCompressionAlgorithm()); @@ -389,7 +424,8 @@ public void testInvalidCompressionAlgorithm() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM, "invalid_comp"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); + ParameterProvider parameterProvider = + TestUtils.createParameterProvider(parameterMap, prop, isIcebergMode); try { parameterProvider.getBdecParquetCompressionAlgorithm(); Assert.fail("Should not have succeeded"); @@ -406,7 +442,8 @@ public void EnableNewJsonParsingLogicAsBool() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.ENABLE_NEW_JSON_PARSING_LOGIC, false); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); + ParameterProvider parameterProvider = + TestUtils.createParameterProvider(parameterMap, prop, isIcebergMode); Assert.assertFalse(parameterProvider.isEnableNewJsonParsingLogic()); } @@ -415,7 +452,8 @@ public void EnableNewJsonParsingLogicAsString() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.ENABLE_NEW_JSON_PARSING_LOGIC, "false"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); + ParameterProvider parameterProvider = + TestUtils.createParameterProvider(parameterMap, prop, isIcebergMode); Assert.assertFalse(parameterProvider.isEnableNewJsonParsingLogic()); } } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java index d5233abac..c33ba1b68 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java @@ -5,6 +5,7 @@ package net.snowflake.ingest.streaming.internal; import static java.time.ZoneOffset.UTC; +import static net.snowflake.ingest.TestUtils.createParameterProvider; import static net.snowflake.ingest.utils.Constants.ACCOUNT_URL; import static net.snowflake.ingest.utils.Constants.OPEN_CHANNEL_ENDPOINT; import static net.snowflake.ingest.utils.Constants.PRIVATE_KEY; @@ -89,18 +90,20 @@ public static Object[] isIcebergMode() { private SnowflakeStreamingIngestClientInternal client; private MockSnowflakeServiceClient.ApiOverride apiOverride; - private Properties prop; - @Before public void setup() { apiOverride = new MockSnowflakeServiceClient.ApiOverride(); CloseableHttpClient httpClient = MockSnowflakeServiceClient.createHttpClient(apiOverride); RequestBuilder requestBuilder = MockSnowflakeServiceClient.createRequestBuilder(httpClient); - prop = new Properties(); - prop.setProperty(ParameterProvider.STREAMING_ICEBERG, String.valueOf(isIcebergMode)); client = new SnowflakeStreamingIngestClientInternal<>( - "client", null, prop, httpClient, true, requestBuilder, new HashMap<>()); + "client", + null, + TestUtils.createProps(isIcebergMode), + httpClient, + true, + requestBuilder, + new HashMap<>()); // some tests assume client is a mock object, just do it for everyone. client = Mockito.spy(client); @@ -505,7 +508,7 @@ public void testOpenChannelSuccessResponse() throws Exception { new SnowflakeStreamingIngestClientInternal<>( "client", new SnowflakeURL("snowflake.dev.local:8082"), - prop, + TestUtils.createProps(isIcebergMode), httpClient, true, requestBuilder, @@ -745,7 +748,7 @@ public void testInsertRowThrottling() { ? ParquetProperties.WriterVersion.PARQUET_2_0 : ParquetProperties.WriterVersion.PARQUET_1_0); - ParameterProvider parameterProvider = new ParameterProvider(isIcebergMode); + ParameterProvider parameterProvider = createParameterProvider(isIcebergMode); memoryInfoProvider.freeMemory = maxMemory * (parameterProvider.getInsertThrottleThresholdInPercentage() - 1) / 100;