diff --git a/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java b/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java index cd6d78787..89e528693 100644 --- a/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java +++ b/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java @@ -28,6 +28,10 @@ public static class Builder { // Allows client to override some default parameter values private Map parameterOverrides; + // Indicates whether it's streaming to Iceberg tables. Open channels on regular tables should + // fail in this mode. + private boolean isIcebergMode; + // Indicates whether it's under test mode private boolean isTestMode; @@ -45,6 +49,11 @@ public Builder setParameterOverrides(Map parameterOverrides) { return this; } + public Builder setIsIceberg(boolean isIcebergMode) { + this.isIcebergMode = isIcebergMode; + return this; + } + public Builder setIsTestMode(boolean isTestMode) { this.isTestMode = isTestMode; return this; @@ -58,7 +67,12 @@ public SnowflakeStreamingIngestClient build() { SnowflakeURL accountURL = new SnowflakeURL(prop.getProperty(Constants.ACCOUNT_URL)); return new SnowflakeStreamingIngestClientInternal<>( - this.name, accountURL, prop, this.parameterOverrides, this.isTestMode); + this.name, + accountURL, + prop, + this.parameterOverrides, + this.isIcebergMode, + this.isTestMode); } } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index 75eb4f717..3603b8e6e 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -121,6 +121,9 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea // Indicates whether the client has closed private volatile boolean isClosed; + // Indicates wheter the client is streaming to Iceberg tables + private final boolean isIcebergMode; + // Indicates whether the client is under test mode private final boolean isTestMode; @@ -152,6 +155,7 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea * @param accountURL Snowflake account url * @param prop connection properties * @param httpClient http client for sending request + * @param isIcebergMode whether we're streaming to iceberg tables * @param isTestMode whether we're under test mode * @param requestBuilder http request builder * @param parameterOverrides parameters we override in case we want to set different values @@ -161,13 +165,15 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea SnowflakeURL accountURL, Properties prop, CloseableHttpClient httpClient, + boolean isIcebergMode, boolean isTestMode, RequestBuilder requestBuilder, Map parameterOverrides) { - this.parameterProvider = new ParameterProvider(parameterOverrides, prop); + this.parameterProvider = new ParameterProvider(parameterOverrides, prop, isIcebergMode); this.name = name; String accountName = accountURL == null ? null : accountURL.getAccount(); + this.isIcebergMode = isIcebergMode; this.isTestMode = isTestMode; this.httpClient = httpClient == null ? HttpUtil.getHttpClient(accountName) : httpClient; this.channelCache = new ChannelCache<>(); @@ -251,6 +257,7 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea * @param accountURL Snowflake account url * @param prop connection properties * @param parameterOverrides map of parameters to override for this client + * @param isIcebergMode whether we're streaming to iceberg tables * @param isTestMode indicates whether it's under test mode */ public SnowflakeStreamingIngestClientInternal( @@ -258,16 +265,17 @@ public SnowflakeStreamingIngestClientInternal( SnowflakeURL accountURL, Properties prop, Map parameterOverrides, + boolean isIcebergMode, boolean isTestMode) { - this(name, accountURL, prop, null, isTestMode, null, parameterOverrides); + this(name, accountURL, prop, null, isIcebergMode, isTestMode, null, parameterOverrides); } /*** Constructor for TEST ONLY * * @param name the name of the client */ - SnowflakeStreamingIngestClientInternal(String name) { - this(name, null, null, null, true, null, new HashMap<>()); + SnowflakeStreamingIngestClientInternal(String name, boolean isIcebergMode) { + this(name, null, null, null, isIcebergMode, true, null, new HashMap<>()); } // TESTING ONLY - inject the request builder diff --git a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java index b98972a7d..33f791ca5 100644 --- a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java +++ b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java @@ -1,5 +1,7 @@ package net.snowflake.ingest.utils; +import static net.snowflake.ingest.utils.ErrorCode.INVALID_CONFIG_PARAMETER; + import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -64,6 +66,10 @@ public class ParameterProvider { public static final Constants.BdecParquetCompression BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT = Constants.BdecParquetCompression.GZIP; + /* Iceberg mode parameters: When streaming to Iceberg mode, different default parameters are required because it generates Parquet files instead of BDEC files. */ + public static final int MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT = + 1; // 1 parquet file per blob + /* Parameter that enables using internal Parquet buffers for buffering of rows before serializing. It reduces memory consumption compared to using Java Objects for buffering.*/ public static final boolean ENABLE_PARQUET_INTERNAL_BUFFERING_DEFAULT = false; @@ -80,14 +86,17 @@ public class ParameterProvider { * * @param parameterOverrides Map of parameter name to value * @param props Properties from profile file + * @param isIcebergMode If the provided parameters need to be verified and modified to meet + * Iceberg mode */ - public ParameterProvider(Map parameterOverrides, Properties props) { - this.setParameterMap(parameterOverrides, props); + public ParameterProvider( + Map parameterOverrides, Properties props, boolean isIcebergMode) { + this.setParameterMap(parameterOverrides, props, isIcebergMode); } /** Empty constructor for tests */ - public ParameterProvider() { - this(null, null); + public ParameterProvider(boolean isIcebergMode) { + this(null, null, isIcebergMode); } private void updateValue( @@ -99,6 +108,8 @@ private void updateValue( this.parameterMap.put(key, parameterOverrides.getOrDefault(key, defaultValue)); } else if (props != null) { this.parameterMap.put(key, props.getOrDefault(key, defaultValue)); + } else { + this.parameterMap.put(key, defaultValue); } } @@ -107,8 +118,11 @@ private void updateValue( * * @param parameterOverrides Map of parameter name -> value * @param props Properties file provided to client constructor + * @param isIcebergMode If the provided parameters need to be verified and modified to meet + * Iceberg mode */ - private void setParameterMap(Map parameterOverrides, Properties props) { + private void setParameterMap( + Map parameterOverrides, Properties props, boolean isIcebergMode) { // BUFFER_FLUSH_INTERVAL_IN_MILLIS is deprecated and disallowed if ((parameterOverrides != null && parameterOverrides.containsKey(BUFFER_FLUSH_INTERVAL_IN_MILLIS)) @@ -179,7 +193,9 @@ private void setParameterMap(Map parameterOverrides, Properties this.updateValue( MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST, - MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT, + isIcebergMode + ? MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT + : MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT, parameterOverrides, props); @@ -188,6 +204,13 @@ private void setParameterMap(Map parameterOverrides, Properties BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT, parameterOverrides, props); + + // Required parameter override for Iceberg mode + if (isIcebergMode) { + icebergModeValidation( + MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST, + MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT); + } } /** @return Longest interval in milliseconds between buffer flushes */ @@ -411,4 +434,14 @@ public Constants.BdecParquetCompression getBdecParquetCompressionAlgorithm() { public String toString() { return "ParameterProvider{" + "parameterMap=" + parameterMap + '}'; } + + private void icebergModeValidation(String key, Object expected) { + Object val = this.parameterMap.get(key); + if (!val.equals(expected)) { + throw new SFException( + INVALID_CONFIG_PARAMETER, + String.format( + "The value %s for %s is invalid in Iceberg mode, should be %s.", val, key, expected)); + } + } } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java index 947908ef9..7c6d797b4 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java @@ -24,7 +24,7 @@ public class ChannelCacheTest { @Before public void setup() { cache = new ChannelCache<>(); - client = new SnowflakeStreamingIngestClientInternal<>("client"); + client = new SnowflakeStreamingIngestClientInternal<>("client", false); channel1 = new SnowflakeStreamingIngestChannelInternal<>( "channel1", diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index f200c7177..e66306d1f 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -7,6 +7,7 @@ import static net.snowflake.ingest.utils.Constants.BLOB_NO_HEADER; import static net.snowflake.ingest.utils.Constants.BLOB_TAG_SIZE_IN_BYTES; import static net.snowflake.ingest.utils.Constants.BLOB_VERSION_SIZE_IN_BYTES; +import static net.snowflake.ingest.utils.ParameterProvider.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT; import static net.snowflake.ingest.utils.ParameterProvider.MAX_CHUNK_SIZE_IN_BYTES_DEFAULT; import com.codahale.metrics.Histogram; @@ -48,11 +49,22 @@ import net.snowflake.ingest.utils.SFException; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatchers; import org.mockito.Mockito; +@RunWith(Parameterized.class) public class FlushServiceTest { + + @Parameterized.Parameters(name = "isIcebergMode: {0}") + public static Object[] isIcebergMode() { + return new Object[] {false, true}; + } + + @Parameterized.Parameter public static boolean isIcebergMode; + public FlushServiceTest() { this.testContextFactory = ParquetTestContext.createFactory(); } @@ -86,7 +98,7 @@ private abstract static class TestContext implements AutoCloseable { TestContext() { stage = Mockito.mock(StreamingIngestStage.class); Mockito.when(stage.getClientPrefix()).thenReturn("client_prefix"); - parameterProvider = new ParameterProvider(); + parameterProvider = new ParameterProvider(isIcebergMode); client = Mockito.mock(SnowflakeStreamingIngestClientInternal.class); Mockito.when(client.getParameterProvider()).thenReturn(parameterProvider); channelCache = new ChannelCache<>(); @@ -586,7 +598,9 @@ public void runTestBlobSplitDueToNumberOfChunks(int numberOfRows) throws Excepti Math.ceil( (double) numberOfRows / channelsPerTable - / ParameterProvider.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT); + / (isIcebergMode + ? MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT + : ParameterProvider.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT)); final TestContext>> testContext = testContextFactory.create(); @@ -861,7 +875,7 @@ public void testInvalidateChannels() { // Create a new Client in order to not interfere with other tests SnowflakeStreamingIngestClientInternal client = Mockito.mock(SnowflakeStreamingIngestClientInternal.class); - ParameterProvider parameterProvider = new ParameterProvider(); + ParameterProvider parameterProvider = new ParameterProvider(isIcebergMode); ChannelCache channelCache = new ChannelCache<>(); Mockito.when(client.getChannelCache()).thenReturn(channelCache); Mockito.when(client.getParameterProvider()).thenReturn(parameterProvider); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/OAuthBasicTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/OAuthBasicTest.java index 6351f267d..0c0c5bb85 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/OAuthBasicTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/OAuthBasicTest.java @@ -114,7 +114,7 @@ public void testCreateOAuthClient() throws Exception { @Test public void testSetRefreshToken() throws Exception { SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("TEST_CLIENT"); + new SnowflakeStreamingIngestClientInternal<>("TEST_CLIENT", false); MockOAuthClient mockOAuthClient = new MockOAuthClient(); OAuthManager oAuthManager = diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java index 86cece9c7..fb73aaa66 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java @@ -1,12 +1,16 @@ package net.snowflake.ingest.streaming.internal; +import static net.snowflake.ingest.utils.ParameterProvider.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT; + import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import net.snowflake.ingest.utils.Constants; +import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.ParameterProvider; +import net.snowflake.ingest.utils.SFException; import org.junit.Assert; import org.junit.Test; @@ -31,7 +35,7 @@ private Map getStartingParameterMap() { public void withValuesSet() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); Assert.assertEquals(1000L, parameterProvider.getCachedMaxClientLagInMs()); Assert.assertEquals(4L, parameterProvider.getBufferFlushCheckIntervalInMs()); @@ -54,7 +58,7 @@ public void withNullProps() { parameterMap.put(ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS, 4L); parameterMap.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE, 6); parameterMap.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_BYTES, 1024); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, null); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, null, false); Assert.assertEquals(3000, parameterProvider.getCachedMaxClientLagInMs()); Assert.assertEquals(4, parameterProvider.getBufferFlushCheckIntervalInMs()); @@ -72,7 +76,7 @@ public void withNullParameterMap() { props.put(ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS, 4L); props.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE, 6); props.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_BYTES, 1024); - ParameterProvider parameterProvider = new ParameterProvider(null, props); + ParameterProvider parameterProvider = new ParameterProvider(null, props, false); Assert.assertEquals(3000, parameterProvider.getCachedMaxClientLagInMs()); Assert.assertEquals(4, parameterProvider.getBufferFlushCheckIntervalInMs()); @@ -85,7 +89,7 @@ public void withNullParameterMap() { @Test public void withNullInputs() { - ParameterProvider parameterProvider = new ParameterProvider(null, null); + ParameterProvider parameterProvider = new ParameterProvider(null, null, false); Assert.assertEquals( ParameterProvider.MAX_CLIENT_LAG_DEFAULT, parameterProvider.getCachedMaxClientLagInMs()); @@ -105,7 +109,7 @@ public void withNullInputs() { @Test public void withDefaultValues() { - ParameterProvider parameterProvider = new ParameterProvider(); + ParameterProvider parameterProvider = new ParameterProvider(false); Assert.assertEquals( ParameterProvider.MAX_CLIENT_LAG_DEFAULT, parameterProvider.getCachedMaxClientLagInMs()); @@ -137,12 +141,49 @@ public void withDefaultValues() { parameterProvider.getBdecParquetCompressionAlgorithm()); } + @Test + public void withIcebergDefaultValues() { + ParameterProvider parameterProvider = new ParameterProvider(true); + + Assert.assertEquals( + ParameterProvider.MAX_CLIENT_LAG_DEFAULT, parameterProvider.getCachedMaxClientLagInMs()); + Assert.assertEquals( + ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS_DEFAULT, + parameterProvider.getBufferFlushCheckIntervalInMs()); + Assert.assertEquals( + ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE_DEFAULT, + parameterProvider.getInsertThrottleThresholdInPercentage()); + Assert.assertEquals( + ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_BYTES_DEFAULT, + parameterProvider.getInsertThrottleThresholdInBytes()); + Assert.assertEquals( + ParameterProvider.INSERT_THROTTLE_INTERVAL_IN_MILLIS_DEFAULT, + parameterProvider.getInsertThrottleIntervalInMs()); + Assert.assertEquals( + ParameterProvider.IO_TIME_CPU_RATIO_DEFAULT, parameterProvider.getIOTimeCpuRatio()); + Assert.assertEquals( + ParameterProvider.BLOB_UPLOAD_MAX_RETRY_COUNT_DEFAULT, + parameterProvider.getBlobUploadMaxRetryCount()); + Assert.assertEquals( + ParameterProvider.MAX_MEMORY_LIMIT_IN_BYTES_DEFAULT, + parameterProvider.getMaxMemoryLimitInBytes()); + Assert.assertEquals( + ParameterProvider.MAX_CHANNEL_SIZE_IN_BYTES_DEFAULT, + parameterProvider.getMaxChannelSizeInBytes()); + Assert.assertEquals( + ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT, + parameterProvider.getBdecParquetCompressionAlgorithm()); + Assert.assertEquals( + MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT, + parameterProvider.getMaxChunksInBlobAndRegistrationRequest()); + } + @Test public void testMaxClientLagEnabled() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "2 second"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); Assert.assertEquals(2000, parameterProvider.getCachedMaxClientLagInMs()); // call again to trigger caching logic Assert.assertEquals(2000, parameterProvider.getCachedMaxClientLagInMs()); @@ -153,7 +194,7 @@ public void testMaxClientLagEnabledPluralTimeUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "2 seconds"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); Assert.assertEquals(2000, parameterProvider.getCachedMaxClientLagInMs()); } @@ -162,7 +203,7 @@ public void testMaxClientLagEnabledMinuteTimeUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "1 minute"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); Assert.assertEquals(60000, parameterProvider.getCachedMaxClientLagInMs()); } @@ -171,7 +212,7 @@ public void testMaxClientLagEnabledMinuteTimeUnitPluralTimeUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "2 minutes"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); Assert.assertEquals(120000, parameterProvider.getCachedMaxClientLagInMs()); } @@ -179,7 +220,7 @@ public void testMaxClientLagEnabledMinuteTimeUnitPluralTimeUnit() { public void testMaxClientLagEnabledDefaultValue() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); Assert.assertEquals( ParameterProvider.MAX_CLIENT_LAG_DEFAULT, parameterProvider.getCachedMaxClientLagInMs()); } @@ -189,7 +230,7 @@ public void testMaxClientLagEnabledDefaultUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "3000"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); Assert.assertEquals(3000, parameterProvider.getCachedMaxClientLagInMs()); } @@ -198,7 +239,7 @@ public void testMaxClientLagEnabledLongInput() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, 3000L); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); Assert.assertEquals(3000, parameterProvider.getCachedMaxClientLagInMs()); } @@ -207,7 +248,7 @@ public void testMaxClientLagEnabledMissingUnitTimeUnitSupplied() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, " year"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -221,7 +262,7 @@ public void testMaxClientLagEnabledInvalidTimeUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "1 year"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -235,7 +276,7 @@ public void testMaxClientLagEnabledInvalidUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "banana minute"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -249,7 +290,7 @@ public void testMaxClientLagEnabledThresholdBelow() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "0 second"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -263,7 +304,7 @@ public void testMaxClientLagEnabledThresholdAbove() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "11 minutes"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -277,7 +318,7 @@ public void testMaxClientLagEnableEmptyInput() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, ""); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -291,8 +332,20 @@ public void testMaxChunksInBlobAndRegistrationRequest() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put("max_chunks_in_blob_and_registration_request", 1); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + Assert.assertEquals(1, parameterProvider.getMaxChunksInBlobAndRegistrationRequest()); + + parameterProvider = new ParameterProvider(parameterMap, prop, true); Assert.assertEquals(1, parameterProvider.getMaxChunksInBlobAndRegistrationRequest()); + + SFException e = + Assert.assertThrows( + SFException.class, + () -> { + parameterMap.put("max_chunks_in_blob_and_registration_request", 100); + new ParameterProvider(parameterMap, prop, true); + }); + Assert.assertEquals(e.getVendorCode(), ErrorCode.INVALID_CONFIG_PARAMETER.getMessageCode()); } @Test @@ -303,7 +356,7 @@ public void testValidCompressionAlgorithmsAndWithUppercaseLowerCase() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM, v); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); Assert.assertEquals( Constants.BdecParquetCompression.GZIP, parameterProvider.getBdecParquetCompressionAlgorithm()); @@ -314,7 +367,7 @@ public void testValidCompressionAlgorithmsAndWithUppercaseLowerCase() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM, v); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); Assert.assertEquals( Constants.BdecParquetCompression.ZSTD, parameterProvider.getBdecParquetCompressionAlgorithm()); @@ -326,7 +379,7 @@ public void testInvalidCompressionAlgorithm() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM, "invalid_comp"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); try { parameterProvider.getBdecParquetCompressionAlgorithm(); Assert.fail("Should not have succeeded"); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RegisterServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RegisterServiceTest.java index 37eb5f96e..4eaea15a4 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RegisterServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RegisterServiceTest.java @@ -14,8 +14,17 @@ import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +@RunWith(Parameterized.class) public class RegisterServiceTest { + @Parameterized.Parameters(name = "isIcebergMode: {0}") + public static Object[] isIcebergMode() { + return new Object[] {false, true}; + } + + @Parameterized.Parameter public boolean isIcebergMode; @Test public void testRegisterService() throws ExecutionException, InterruptedException { @@ -45,7 +54,7 @@ public void testRegisterService() throws ExecutionException, InterruptedExceptio @Test public void testRegisterServiceTimeoutException() throws Exception { SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client"); + new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); RegisterService rs = new RegisterService<>(client, true); Pair, CompletableFuture> blobFuture1 = @@ -73,7 +82,7 @@ public void testRegisterServiceTimeoutException() throws Exception { @Test public void testRegisterServiceTimeoutException_testRetries() throws Exception { SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client"); + new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); RegisterService rs = new RegisterService<>(client, true); Pair, CompletableFuture> blobFuture1 = @@ -107,7 +116,7 @@ public void testRegisterServiceTimeoutException_testRetries() throws Exception { @Test public void testRegisterServiceNonTimeoutException() { SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client"); + new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); RegisterService rs = new RegisterService<>(client, true); CompletableFuture future = new CompletableFuture<>(); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java index b4fa769a1..c10692416 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java @@ -44,8 +44,11 @@ import net.snowflake.ingest.utils.Utils; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.mockito.Mockito; +@RunWith(Parameterized.class) public class SnowflakeStreamingIngestChannelTest { /** @@ -67,6 +70,13 @@ public long getFreeMemory() { } } + @Parameterized.Parameters(name = "isIcebergMode: {0}") + public static Object[] isIcebergMode() { + return new Object[] {false, true}; + } + + @Parameterized.Parameter public boolean isIcebergMode; + @Test public void testChannelFactoryNullFields() { String name = "CHANNEL"; @@ -76,7 +86,7 @@ public void testChannelFactoryNullFields() { long channelSequencer = 0L; long rowSequencer = 0L; SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client"); + new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); Object[] fields = new Object[] { @@ -118,7 +128,7 @@ public void testChannelFactorySuccess() { long rowSequencer = 0L; SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client"); + new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); SnowflakeStreamingIngestChannelInternal channel = SnowflakeStreamingIngestChannelFactory.builder(name) @@ -153,7 +163,7 @@ public void testChannelFactorySuccess() { @Test public void testChannelValid() { SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client"); + new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -203,7 +213,7 @@ public void testChannelValid() { @Test public void testChannelClose() { SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client"); + new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -348,6 +358,7 @@ public void testOpenChannelErrorResponse() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -418,6 +429,7 @@ public void testOpenChannelSnowflakeInternalErrorResponse() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -500,6 +512,7 @@ public void testOpenChannelSuccessResponse() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -538,7 +551,9 @@ public void testOpenChannelSuccessResponse() throws Exception { @Test public void testInsertRow() { SnowflakeStreamingIngestClientInternal client; - client = new SnowflakeStreamingIngestClientInternal("client_PARQUET"); + client = + new SnowflakeStreamingIngestClientInternal( + "client_PARQUET", isIcebergMode); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -622,7 +637,8 @@ public void testInsertTooLargeRow() { schema.forEach(x -> row.put(x.getName(), byteArrayOneMb)); SnowflakeStreamingIngestClientInternal client; - client = new SnowflakeStreamingIngestClientInternal("test_client"); + client = + new SnowflakeStreamingIngestClientInternal("test_client", isIcebergMode); // Test channel with on error CONTINUE SnowflakeStreamingIngestChannelInternal channel = @@ -706,7 +722,7 @@ public void testInsertRowThrottling() { memoryInfoProvider.maxMemory = maxMemory; SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client"); + new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -722,7 +738,7 @@ public void testInsertRowThrottling() { OpenChannelRequest.OnErrorOption.CONTINUE, UTC); - ParameterProvider parameterProvider = new ParameterProvider(); + ParameterProvider parameterProvider = new ParameterProvider(isIcebergMode); memoryInfoProvider.freeMemory = maxMemory * (parameterProvider.getInsertThrottleThresholdInPercentage() - 1) / 100; @@ -752,7 +768,7 @@ public void testInsertRowThrottling() { @Test public void testFlush() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -788,7 +804,7 @@ public void testFlush() throws Exception { @Test public void testClose() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); SnowflakeStreamingIngestChannel channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -822,7 +838,7 @@ public void testClose() throws Exception { @Test public void testDropOnClose() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -859,7 +875,7 @@ public void testDropOnClose() throws Exception { @Test public void testDropOnCloseInvalidChannel() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -892,7 +908,7 @@ public void testDropOnCloseInvalidChannel() throws Exception { public void testGetLatestCommittedOffsetToken() { String offsetToken = "10"; SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); SnowflakeStreamingIngestChannel channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java index 553efbd31..2b84f64a5 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java @@ -66,9 +66,12 @@ import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.mockito.ArgumentMatchers; import org.mockito.Mockito; +@RunWith(Parameterized.class) public class SnowflakeStreamingIngestClientTest { private static final ObjectMapper objectMapper = new ObjectMapper(); @@ -79,6 +82,14 @@ public class SnowflakeStreamingIngestClientTest { SnowflakeStreamingIngestChannelInternal channel3; SnowflakeStreamingIngestChannelInternal channel4; + // TODO: Add IcebergMode = True after Streaming to Iceberg is supported. + @Parameterized.Parameters(name = "isIcebergMode: {0}") + public static Object[] isIcebergMode() { + return new Object[] {false}; + } + + @Parameterized.Parameter public boolean isIcebergMode; + @Before public void setup() throws Exception { objectMapper.setVisibility(PropertyAccessor.GETTER, JsonAutoDetect.Visibility.ANY); @@ -184,6 +195,7 @@ public void testConstructorParameters() throws Exception { SnowflakeStreamingIngestClientFactory.builder("client") .setProperties(prop) .setParameterOverrides(parameterMap) + .setIsIceberg(isIcebergMode) .setIsTestMode(true) .build(); @@ -211,6 +223,7 @@ public void testClientFactoryWithJmxMetrics() throws Exception { .setProperties(prop) .setParameterOverrides( Collections.singletonMap(ENABLE_SNOWPIPE_STREAMING_METRICS, true)) + .setIsIceberg(isIcebergMode) .build(); Assert.assertEquals("client", client.getName()); @@ -362,6 +375,7 @@ public void testGetChannelsStatusWithRequest() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -421,6 +435,7 @@ public void testDropChannel() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -466,6 +481,7 @@ public void testGetChannelsStatusWithRequestError() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -710,6 +726,7 @@ public void testGetRetryBlobs() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -751,6 +768,7 @@ public void testRegisterBlobErrorResponse() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -798,6 +816,7 @@ public void testRegisterBlobSnowflakeInternalErrorResponse() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -854,6 +873,7 @@ public void testRegisterBlobSuccessResponse() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -938,6 +958,7 @@ public void testRegisterBlobsRetries() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -967,6 +988,7 @@ public void testRegisterBlobChunkLimit() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null)); @@ -1139,6 +1161,7 @@ public void testRegisterBlobsRetriesSucceeds() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -1213,6 +1236,7 @@ public void testRegisterBlobResponseWithInvalidChannel() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -1263,7 +1287,7 @@ public void testRegisterBlobResponseWithInvalidChannel() throws Exception { @Test public void testFlush() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); ChannelsStatusResponse response = new ChannelsStatusResponse(); response.setStatusCode(0L); response.setMessage("Success"); @@ -1285,7 +1309,7 @@ public void testFlush() throws Exception { @Test public void testClose() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); ChannelsStatusResponse response = new ChannelsStatusResponse(); response.setStatusCode(0L); response.setMessage("Success"); @@ -1319,7 +1343,7 @@ public void testClose() throws Exception { @Test public void testCloseWithError() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); CompletableFuture future = new CompletableFuture<>(); future.completeExceptionally(new Exception("Simulating Error")); @@ -1357,7 +1381,7 @@ public void testCloseWithError() throws Exception { @Test public void testVerifyChannelsAreFullyCommittedSuccess() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel1", @@ -1405,6 +1429,7 @@ public void testFlushServiceException() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, parameterMap); @@ -1441,6 +1466,7 @@ public void testGetLatestCommittedOffsetTokens() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestStageTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestStageTest.java index 1ba9f98df..d12c6231c 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestStageTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestStageTest.java @@ -276,7 +276,7 @@ public void testRefreshSnowflakeMetadataRemote() throws Exception { Mockito.when(mockResponse.getEntity()).thenReturn(entity); Mockito.when(mockClient.execute(Mockito.any())).thenReturn(mockResponse); - ParameterProvider parameterProvider = new ParameterProvider(); + ParameterProvider parameterProvider = new ParameterProvider(false); StreamingIngestStage stage = new StreamingIngestStage(true, "role", mockClient, mockBuilder, "clientName", 1);