From c6dfbf31be807072e31bfe137f7c7dc7861fad20 Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Mon, 10 Jun 2024 17:31:43 -0700 Subject: [PATCH 1/2] SNOW-1437885 Disable blob interleaving when in Iceberg mode (#763) * Add parameters & disable interleaving mode --- ...SnowflakeStreamingIngestClientFactory.java | 16 ++- ...nowflakeStreamingIngestClientInternal.java | 16 ++- .../ingest/utils/ParameterProvider.java | 45 +++++++-- .../streaming/internal/ChannelCacheTest.java | 2 +- .../streaming/internal/FlushServiceTest.java | 20 +++- .../streaming/internal/OAuthBasicTest.java | 2 +- .../internal/ParameterProviderTest.java | 97 ++++++++++++++----- .../internal/RegisterServiceTest.java | 15 ++- .../SnowflakeStreamingIngestChannelTest.java | 42 +++++--- .../SnowflakeStreamingIngestClientTest.java | 34 ++++++- .../internal/StreamingIngestStageTest.java | 2 +- 11 files changed, 232 insertions(+), 59 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java b/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java index cd6d78787..89e528693 100644 --- a/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java +++ b/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java @@ -28,6 +28,10 @@ public static class Builder { // Allows client to override some default parameter values private Map parameterOverrides; + // Indicates whether it's streaming to Iceberg tables. Open channels on regular tables should + // fail in this mode. + private boolean isIcebergMode; + // Indicates whether it's under test mode private boolean isTestMode; @@ -45,6 +49,11 @@ public Builder setParameterOverrides(Map parameterOverrides) { return this; } + public Builder setIsIceberg(boolean isIcebergMode) { + this.isIcebergMode = isIcebergMode; + return this; + } + public Builder setIsTestMode(boolean isTestMode) { this.isTestMode = isTestMode; return this; @@ -58,7 +67,12 @@ public SnowflakeStreamingIngestClient build() { SnowflakeURL accountURL = new SnowflakeURL(prop.getProperty(Constants.ACCOUNT_URL)); return new SnowflakeStreamingIngestClientInternal<>( - this.name, accountURL, prop, this.parameterOverrides, this.isTestMode); + this.name, + accountURL, + prop, + this.parameterOverrides, + this.isIcebergMode, + this.isTestMode); } } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index 2990b49d8..13cc673ff 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -121,6 +121,9 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea // Indicates whether the client has closed private volatile boolean isClosed; + // Indicates wheter the client is streaming to Iceberg tables + private final boolean isIcebergMode; + // Indicates whether the client is under test mode private final boolean isTestMode; @@ -152,6 +155,7 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea * @param accountURL Snowflake account url * @param prop connection properties * @param httpClient http client for sending request + * @param isIcebergMode whether we're streaming to iceberg tables * @param isTestMode whether we're under test mode * @param requestBuilder http request builder * @param parameterOverrides parameters we override in case we want to set different values @@ -161,13 +165,15 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea SnowflakeURL accountURL, Properties prop, CloseableHttpClient httpClient, + boolean isIcebergMode, boolean isTestMode, RequestBuilder requestBuilder, Map parameterOverrides) { - this.parameterProvider = new ParameterProvider(parameterOverrides, prop); + this.parameterProvider = new ParameterProvider(parameterOverrides, prop, isIcebergMode); this.name = name; String accountName = accountURL == null ? null : accountURL.getAccount(); + this.isIcebergMode = isIcebergMode; this.isTestMode = isTestMode; this.httpClient = httpClient == null ? HttpUtil.getHttpClient(accountName) : httpClient; this.channelCache = new ChannelCache<>(); @@ -251,6 +257,7 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea * @param accountURL Snowflake account url * @param prop connection properties * @param parameterOverrides map of parameters to override for this client + * @param isIcebergMode whether we're streaming to iceberg tables * @param isTestMode indicates whether it's under test mode */ public SnowflakeStreamingIngestClientInternal( @@ -258,16 +265,17 @@ public SnowflakeStreamingIngestClientInternal( SnowflakeURL accountURL, Properties prop, Map parameterOverrides, + boolean isIcebergMode, boolean isTestMode) { - this(name, accountURL, prop, null, isTestMode, null, parameterOverrides); + this(name, accountURL, prop, null, isIcebergMode, isTestMode, null, parameterOverrides); } /*** Constructor for TEST ONLY * * @param name the name of the client */ - SnowflakeStreamingIngestClientInternal(String name) { - this(name, null, null, null, true, null, new HashMap<>()); + SnowflakeStreamingIngestClientInternal(String name, boolean isIcebergMode) { + this(name, null, null, null, isIcebergMode, true, null, new HashMap<>()); } // TESTING ONLY - inject the request builder diff --git a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java index b98972a7d..33f791ca5 100644 --- a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java +++ b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java @@ -1,5 +1,7 @@ package net.snowflake.ingest.utils; +import static net.snowflake.ingest.utils.ErrorCode.INVALID_CONFIG_PARAMETER; + import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -64,6 +66,10 @@ public class ParameterProvider { public static final Constants.BdecParquetCompression BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT = Constants.BdecParquetCompression.GZIP; + /* Iceberg mode parameters: When streaming to Iceberg mode, different default parameters are required because it generates Parquet files instead of BDEC files. */ + public static final int MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT = + 1; // 1 parquet file per blob + /* Parameter that enables using internal Parquet buffers for buffering of rows before serializing. It reduces memory consumption compared to using Java Objects for buffering.*/ public static final boolean ENABLE_PARQUET_INTERNAL_BUFFERING_DEFAULT = false; @@ -80,14 +86,17 @@ public class ParameterProvider { * * @param parameterOverrides Map of parameter name to value * @param props Properties from profile file + * @param isIcebergMode If the provided parameters need to be verified and modified to meet + * Iceberg mode */ - public ParameterProvider(Map parameterOverrides, Properties props) { - this.setParameterMap(parameterOverrides, props); + public ParameterProvider( + Map parameterOverrides, Properties props, boolean isIcebergMode) { + this.setParameterMap(parameterOverrides, props, isIcebergMode); } /** Empty constructor for tests */ - public ParameterProvider() { - this(null, null); + public ParameterProvider(boolean isIcebergMode) { + this(null, null, isIcebergMode); } private void updateValue( @@ -99,6 +108,8 @@ private void updateValue( this.parameterMap.put(key, parameterOverrides.getOrDefault(key, defaultValue)); } else if (props != null) { this.parameterMap.put(key, props.getOrDefault(key, defaultValue)); + } else { + this.parameterMap.put(key, defaultValue); } } @@ -107,8 +118,11 @@ private void updateValue( * * @param parameterOverrides Map of parameter name -> value * @param props Properties file provided to client constructor + * @param isIcebergMode If the provided parameters need to be verified and modified to meet + * Iceberg mode */ - private void setParameterMap(Map parameterOverrides, Properties props) { + private void setParameterMap( + Map parameterOverrides, Properties props, boolean isIcebergMode) { // BUFFER_FLUSH_INTERVAL_IN_MILLIS is deprecated and disallowed if ((parameterOverrides != null && parameterOverrides.containsKey(BUFFER_FLUSH_INTERVAL_IN_MILLIS)) @@ -179,7 +193,9 @@ private void setParameterMap(Map parameterOverrides, Properties this.updateValue( MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST, - MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT, + isIcebergMode + ? MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT + : MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT, parameterOverrides, props); @@ -188,6 +204,13 @@ private void setParameterMap(Map parameterOverrides, Properties BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT, parameterOverrides, props); + + // Required parameter override for Iceberg mode + if (isIcebergMode) { + icebergModeValidation( + MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST, + MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT); + } } /** @return Longest interval in milliseconds between buffer flushes */ @@ -411,4 +434,14 @@ public Constants.BdecParquetCompression getBdecParquetCompressionAlgorithm() { public String toString() { return "ParameterProvider{" + "parameterMap=" + parameterMap + '}'; } + + private void icebergModeValidation(String key, Object expected) { + Object val = this.parameterMap.get(key); + if (!val.equals(expected)) { + throw new SFException( + INVALID_CONFIG_PARAMETER, + String.format( + "The value %s for %s is invalid in Iceberg mode, should be %s.", val, key, expected)); + } + } } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java index 947908ef9..7c6d797b4 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java @@ -24,7 +24,7 @@ public class ChannelCacheTest { @Before public void setup() { cache = new ChannelCache<>(); - client = new SnowflakeStreamingIngestClientInternal<>("client"); + client = new SnowflakeStreamingIngestClientInternal<>("client", false); channel1 = new SnowflakeStreamingIngestChannelInternal<>( "channel1", diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index f200c7177..e66306d1f 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -7,6 +7,7 @@ import static net.snowflake.ingest.utils.Constants.BLOB_NO_HEADER; import static net.snowflake.ingest.utils.Constants.BLOB_TAG_SIZE_IN_BYTES; import static net.snowflake.ingest.utils.Constants.BLOB_VERSION_SIZE_IN_BYTES; +import static net.snowflake.ingest.utils.ParameterProvider.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT; import static net.snowflake.ingest.utils.ParameterProvider.MAX_CHUNK_SIZE_IN_BYTES_DEFAULT; import com.codahale.metrics.Histogram; @@ -48,11 +49,22 @@ import net.snowflake.ingest.utils.SFException; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatchers; import org.mockito.Mockito; +@RunWith(Parameterized.class) public class FlushServiceTest { + + @Parameterized.Parameters(name = "isIcebergMode: {0}") + public static Object[] isIcebergMode() { + return new Object[] {false, true}; + } + + @Parameterized.Parameter public static boolean isIcebergMode; + public FlushServiceTest() { this.testContextFactory = ParquetTestContext.createFactory(); } @@ -86,7 +98,7 @@ private abstract static class TestContext implements AutoCloseable { TestContext() { stage = Mockito.mock(StreamingIngestStage.class); Mockito.when(stage.getClientPrefix()).thenReturn("client_prefix"); - parameterProvider = new ParameterProvider(); + parameterProvider = new ParameterProvider(isIcebergMode); client = Mockito.mock(SnowflakeStreamingIngestClientInternal.class); Mockito.when(client.getParameterProvider()).thenReturn(parameterProvider); channelCache = new ChannelCache<>(); @@ -586,7 +598,9 @@ public void runTestBlobSplitDueToNumberOfChunks(int numberOfRows) throws Excepti Math.ceil( (double) numberOfRows / channelsPerTable - / ParameterProvider.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT); + / (isIcebergMode + ? MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT + : ParameterProvider.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT)); final TestContext>> testContext = testContextFactory.create(); @@ -861,7 +875,7 @@ public void testInvalidateChannels() { // Create a new Client in order to not interfere with other tests SnowflakeStreamingIngestClientInternal client = Mockito.mock(SnowflakeStreamingIngestClientInternal.class); - ParameterProvider parameterProvider = new ParameterProvider(); + ParameterProvider parameterProvider = new ParameterProvider(isIcebergMode); ChannelCache channelCache = new ChannelCache<>(); Mockito.when(client.getChannelCache()).thenReturn(channelCache); Mockito.when(client.getParameterProvider()).thenReturn(parameterProvider); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/OAuthBasicTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/OAuthBasicTest.java index 6351f267d..0c0c5bb85 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/OAuthBasicTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/OAuthBasicTest.java @@ -114,7 +114,7 @@ public void testCreateOAuthClient() throws Exception { @Test public void testSetRefreshToken() throws Exception { SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("TEST_CLIENT"); + new SnowflakeStreamingIngestClientInternal<>("TEST_CLIENT", false); MockOAuthClient mockOAuthClient = new MockOAuthClient(); OAuthManager oAuthManager = diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java index 86cece9c7..fb73aaa66 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java @@ -1,12 +1,16 @@ package net.snowflake.ingest.streaming.internal; +import static net.snowflake.ingest.utils.ParameterProvider.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT; + import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import net.snowflake.ingest.utils.Constants; +import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.ParameterProvider; +import net.snowflake.ingest.utils.SFException; import org.junit.Assert; import org.junit.Test; @@ -31,7 +35,7 @@ private Map getStartingParameterMap() { public void withValuesSet() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); Assert.assertEquals(1000L, parameterProvider.getCachedMaxClientLagInMs()); Assert.assertEquals(4L, parameterProvider.getBufferFlushCheckIntervalInMs()); @@ -54,7 +58,7 @@ public void withNullProps() { parameterMap.put(ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS, 4L); parameterMap.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE, 6); parameterMap.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_BYTES, 1024); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, null); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, null, false); Assert.assertEquals(3000, parameterProvider.getCachedMaxClientLagInMs()); Assert.assertEquals(4, parameterProvider.getBufferFlushCheckIntervalInMs()); @@ -72,7 +76,7 @@ public void withNullParameterMap() { props.put(ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS, 4L); props.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE, 6); props.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_BYTES, 1024); - ParameterProvider parameterProvider = new ParameterProvider(null, props); + ParameterProvider parameterProvider = new ParameterProvider(null, props, false); Assert.assertEquals(3000, parameterProvider.getCachedMaxClientLagInMs()); Assert.assertEquals(4, parameterProvider.getBufferFlushCheckIntervalInMs()); @@ -85,7 +89,7 @@ public void withNullParameterMap() { @Test public void withNullInputs() { - ParameterProvider parameterProvider = new ParameterProvider(null, null); + ParameterProvider parameterProvider = new ParameterProvider(null, null, false); Assert.assertEquals( ParameterProvider.MAX_CLIENT_LAG_DEFAULT, parameterProvider.getCachedMaxClientLagInMs()); @@ -105,7 +109,7 @@ public void withNullInputs() { @Test public void withDefaultValues() { - ParameterProvider parameterProvider = new ParameterProvider(); + ParameterProvider parameterProvider = new ParameterProvider(false); Assert.assertEquals( ParameterProvider.MAX_CLIENT_LAG_DEFAULT, parameterProvider.getCachedMaxClientLagInMs()); @@ -137,12 +141,49 @@ public void withDefaultValues() { parameterProvider.getBdecParquetCompressionAlgorithm()); } + @Test + public void withIcebergDefaultValues() { + ParameterProvider parameterProvider = new ParameterProvider(true); + + Assert.assertEquals( + ParameterProvider.MAX_CLIENT_LAG_DEFAULT, parameterProvider.getCachedMaxClientLagInMs()); + Assert.assertEquals( + ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS_DEFAULT, + parameterProvider.getBufferFlushCheckIntervalInMs()); + Assert.assertEquals( + ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE_DEFAULT, + parameterProvider.getInsertThrottleThresholdInPercentage()); + Assert.assertEquals( + ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_BYTES_DEFAULT, + parameterProvider.getInsertThrottleThresholdInBytes()); + Assert.assertEquals( + ParameterProvider.INSERT_THROTTLE_INTERVAL_IN_MILLIS_DEFAULT, + parameterProvider.getInsertThrottleIntervalInMs()); + Assert.assertEquals( + ParameterProvider.IO_TIME_CPU_RATIO_DEFAULT, parameterProvider.getIOTimeCpuRatio()); + Assert.assertEquals( + ParameterProvider.BLOB_UPLOAD_MAX_RETRY_COUNT_DEFAULT, + parameterProvider.getBlobUploadMaxRetryCount()); + Assert.assertEquals( + ParameterProvider.MAX_MEMORY_LIMIT_IN_BYTES_DEFAULT, + parameterProvider.getMaxMemoryLimitInBytes()); + Assert.assertEquals( + ParameterProvider.MAX_CHANNEL_SIZE_IN_BYTES_DEFAULT, + parameterProvider.getMaxChannelSizeInBytes()); + Assert.assertEquals( + ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT, + parameterProvider.getBdecParquetCompressionAlgorithm()); + Assert.assertEquals( + MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT, + parameterProvider.getMaxChunksInBlobAndRegistrationRequest()); + } + @Test public void testMaxClientLagEnabled() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "2 second"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); Assert.assertEquals(2000, parameterProvider.getCachedMaxClientLagInMs()); // call again to trigger caching logic Assert.assertEquals(2000, parameterProvider.getCachedMaxClientLagInMs()); @@ -153,7 +194,7 @@ public void testMaxClientLagEnabledPluralTimeUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "2 seconds"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); Assert.assertEquals(2000, parameterProvider.getCachedMaxClientLagInMs()); } @@ -162,7 +203,7 @@ public void testMaxClientLagEnabledMinuteTimeUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "1 minute"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); Assert.assertEquals(60000, parameterProvider.getCachedMaxClientLagInMs()); } @@ -171,7 +212,7 @@ public void testMaxClientLagEnabledMinuteTimeUnitPluralTimeUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "2 minutes"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); Assert.assertEquals(120000, parameterProvider.getCachedMaxClientLagInMs()); } @@ -179,7 +220,7 @@ public void testMaxClientLagEnabledMinuteTimeUnitPluralTimeUnit() { public void testMaxClientLagEnabledDefaultValue() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); Assert.assertEquals( ParameterProvider.MAX_CLIENT_LAG_DEFAULT, parameterProvider.getCachedMaxClientLagInMs()); } @@ -189,7 +230,7 @@ public void testMaxClientLagEnabledDefaultUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "3000"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); Assert.assertEquals(3000, parameterProvider.getCachedMaxClientLagInMs()); } @@ -198,7 +239,7 @@ public void testMaxClientLagEnabledLongInput() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, 3000L); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); Assert.assertEquals(3000, parameterProvider.getCachedMaxClientLagInMs()); } @@ -207,7 +248,7 @@ public void testMaxClientLagEnabledMissingUnitTimeUnitSupplied() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, " year"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -221,7 +262,7 @@ public void testMaxClientLagEnabledInvalidTimeUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "1 year"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -235,7 +276,7 @@ public void testMaxClientLagEnabledInvalidUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "banana minute"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -249,7 +290,7 @@ public void testMaxClientLagEnabledThresholdBelow() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "0 second"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -263,7 +304,7 @@ public void testMaxClientLagEnabledThresholdAbove() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "11 minutes"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -277,7 +318,7 @@ public void testMaxClientLagEnableEmptyInput() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, ""); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -291,8 +332,20 @@ public void testMaxChunksInBlobAndRegistrationRequest() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put("max_chunks_in_blob_and_registration_request", 1); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + Assert.assertEquals(1, parameterProvider.getMaxChunksInBlobAndRegistrationRequest()); + + parameterProvider = new ParameterProvider(parameterMap, prop, true); Assert.assertEquals(1, parameterProvider.getMaxChunksInBlobAndRegistrationRequest()); + + SFException e = + Assert.assertThrows( + SFException.class, + () -> { + parameterMap.put("max_chunks_in_blob_and_registration_request", 100); + new ParameterProvider(parameterMap, prop, true); + }); + Assert.assertEquals(e.getVendorCode(), ErrorCode.INVALID_CONFIG_PARAMETER.getMessageCode()); } @Test @@ -303,7 +356,7 @@ public void testValidCompressionAlgorithmsAndWithUppercaseLowerCase() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM, v); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); Assert.assertEquals( Constants.BdecParquetCompression.GZIP, parameterProvider.getBdecParquetCompressionAlgorithm()); @@ -314,7 +367,7 @@ public void testValidCompressionAlgorithmsAndWithUppercaseLowerCase() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM, v); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); Assert.assertEquals( Constants.BdecParquetCompression.ZSTD, parameterProvider.getBdecParquetCompressionAlgorithm()); @@ -326,7 +379,7 @@ public void testInvalidCompressionAlgorithm() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM, "invalid_comp"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); try { parameterProvider.getBdecParquetCompressionAlgorithm(); Assert.fail("Should not have succeeded"); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RegisterServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RegisterServiceTest.java index 37eb5f96e..4eaea15a4 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RegisterServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RegisterServiceTest.java @@ -14,8 +14,17 @@ import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +@RunWith(Parameterized.class) public class RegisterServiceTest { + @Parameterized.Parameters(name = "isIcebergMode: {0}") + public static Object[] isIcebergMode() { + return new Object[] {false, true}; + } + + @Parameterized.Parameter public boolean isIcebergMode; @Test public void testRegisterService() throws ExecutionException, InterruptedException { @@ -45,7 +54,7 @@ public void testRegisterService() throws ExecutionException, InterruptedExceptio @Test public void testRegisterServiceTimeoutException() throws Exception { SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client"); + new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); RegisterService rs = new RegisterService<>(client, true); Pair, CompletableFuture> blobFuture1 = @@ -73,7 +82,7 @@ public void testRegisterServiceTimeoutException() throws Exception { @Test public void testRegisterServiceTimeoutException_testRetries() throws Exception { SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client"); + new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); RegisterService rs = new RegisterService<>(client, true); Pair, CompletableFuture> blobFuture1 = @@ -107,7 +116,7 @@ public void testRegisterServiceTimeoutException_testRetries() throws Exception { @Test public void testRegisterServiceNonTimeoutException() { SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client"); + new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); RegisterService rs = new RegisterService<>(client, true); CompletableFuture future = new CompletableFuture<>(); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java index 87e3f8f11..5beb0662f 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java @@ -44,8 +44,11 @@ import net.snowflake.ingest.utils.Utils; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.mockito.Mockito; +@RunWith(Parameterized.class) public class SnowflakeStreamingIngestChannelTest { /** @@ -72,6 +75,13 @@ public long getFreeMemory() { } } + @Parameterized.Parameters(name = "isIcebergMode: {0}") + public static Object[] isIcebergMode() { + return new Object[] {false, true}; + } + + @Parameterized.Parameter public boolean isIcebergMode; + @Test public void testChannelFactoryNullFields() { String name = "CHANNEL"; @@ -81,7 +91,7 @@ public void testChannelFactoryNullFields() { long channelSequencer = 0L; long rowSequencer = 0L; SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client"); + new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); Object[] fields = new Object[] { @@ -123,7 +133,7 @@ public void testChannelFactorySuccess() { long rowSequencer = 0L; SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client"); + new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); SnowflakeStreamingIngestChannelInternal channel = SnowflakeStreamingIngestChannelFactory.builder(name) @@ -158,7 +168,7 @@ public void testChannelFactorySuccess() { @Test public void testChannelValid() { SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client"); + new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -208,7 +218,7 @@ public void testChannelValid() { @Test public void testChannelClose() { SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client"); + new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -347,6 +357,7 @@ public void testOpenChannelErrorResponse() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -417,6 +428,7 @@ public void testOpenChannelSnowflakeInternalErrorResponse() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -499,6 +511,7 @@ public void testOpenChannelSuccessResponse() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -537,7 +550,9 @@ public void testOpenChannelSuccessResponse() throws Exception { @Test public void testInsertRow() { SnowflakeStreamingIngestClientInternal client; - client = new SnowflakeStreamingIngestClientInternal("client_PARQUET"); + client = + new SnowflakeStreamingIngestClientInternal( + "client_PARQUET", isIcebergMode); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -621,7 +636,8 @@ public void testInsertTooLargeRow() { schema.forEach(x -> row.put(x.getName(), byteArrayOneMb)); SnowflakeStreamingIngestClientInternal client; - client = new SnowflakeStreamingIngestClientInternal("test_client"); + client = + new SnowflakeStreamingIngestClientInternal("test_client", isIcebergMode); // Test channel with on error CONTINUE SnowflakeStreamingIngestChannelInternal channel = @@ -705,7 +721,7 @@ public void testInsertRowThrottling() { memoryInfoProvider.maxMemory = maxMemory; SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client"); + new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -721,7 +737,7 @@ public void testInsertRowThrottling() { OpenChannelRequest.OnErrorOption.CONTINUE, UTC); - ParameterProvider parameterProvider = new ParameterProvider(); + ParameterProvider parameterProvider = new ParameterProvider(isIcebergMode); memoryInfoProvider.freeMemory = maxMemory * (parameterProvider.getInsertThrottleThresholdInPercentage() - 1) / 100; @@ -751,7 +767,7 @@ public void testInsertRowThrottling() { @Test public void testFlush() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -787,7 +803,7 @@ public void testFlush() throws Exception { @Test public void testClose() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); SnowflakeStreamingIngestChannel channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -821,7 +837,7 @@ public void testClose() throws Exception { @Test public void testDropOnClose() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -858,7 +874,7 @@ public void testDropOnClose() throws Exception { @Test public void testDropOnCloseInvalidChannel() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -891,7 +907,7 @@ public void testDropOnCloseInvalidChannel() throws Exception { public void testGetLatestCommittedOffsetToken() { String offsetToken = "10"; SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); SnowflakeStreamingIngestChannel channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java index 1693e1520..4e66d8a15 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java @@ -66,9 +66,12 @@ import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.mockito.ArgumentMatchers; import org.mockito.Mockito; +@RunWith(Parameterized.class) public class SnowflakeStreamingIngestClientTest { private static final ObjectMapper objectMapper = new ObjectMapper(); @@ -79,6 +82,14 @@ public class SnowflakeStreamingIngestClientTest { SnowflakeStreamingIngestChannelInternal channel3; SnowflakeStreamingIngestChannelInternal channel4; + // TODO: Add IcebergMode = True after Streaming to Iceberg is supported. + @Parameterized.Parameters(name = "isIcebergMode: {0}") + public static Object[] isIcebergMode() { + return new Object[] {false}; + } + + @Parameterized.Parameter public boolean isIcebergMode; + @Before public void setup() { objectMapper.setVisibility(PropertyAccessor.GETTER, JsonAutoDetect.Visibility.ANY); @@ -166,6 +177,7 @@ public void testConstructorParameters() throws Exception { SnowflakeStreamingIngestClientFactory.builder("client") .setProperties(prop) .setParameterOverrides(parameterMap) + .setIsIceberg(isIcebergMode) .setIsTestMode(true) .build(); @@ -193,6 +205,7 @@ public void testClientFactoryWithJmxMetrics() throws Exception { .setProperties(prop) .setParameterOverrides( Collections.singletonMap(ENABLE_SNOWPIPE_STREAMING_METRICS, true)) + .setIsIceberg(isIcebergMode) .build(); Assert.assertEquals("client", client.getName()); @@ -344,6 +357,7 @@ public void testGetChannelsStatusWithRequest() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -404,6 +418,7 @@ public void testDropChannel() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -449,6 +464,7 @@ public void testGetChannelsStatusWithRequestError() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -677,6 +693,7 @@ public void testGetRetryBlobs() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -718,6 +735,7 @@ public void testRegisterBlobErrorResponse() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -765,6 +783,7 @@ public void testRegisterBlobSnowflakeInternalErrorResponse() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -821,6 +840,7 @@ public void testRegisterBlobSuccessResponse() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -905,6 +925,7 @@ public void testRegisterBlobsRetries() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -934,6 +955,7 @@ public void testRegisterBlobChunkLimit() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null)); @@ -1106,6 +1128,7 @@ public void testRegisterBlobsRetriesSucceeds() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -1180,6 +1203,7 @@ public void testRegisterBlobResponseWithInvalidChannel() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -1230,7 +1254,7 @@ public void testRegisterBlobResponseWithInvalidChannel() throws Exception { @Test public void testFlush() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); ChannelsStatusResponse response = new ChannelsStatusResponse(); response.setStatusCode(0L); response.setMessage("Success"); @@ -1252,7 +1276,7 @@ public void testFlush() throws Exception { @Test public void testClose() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); ChannelsStatusResponse response = new ChannelsStatusResponse(); response.setStatusCode(0L); response.setMessage("Success"); @@ -1286,7 +1310,7 @@ public void testClose() throws Exception { @Test public void testCloseWithError() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); CompletableFuture future = new CompletableFuture<>(); future.completeExceptionally(new Exception("Simulating Error")); @@ -1324,7 +1348,7 @@ public void testCloseWithError() throws Exception { @Test public void testVerifyChannelsAreFullyCommittedSuccess() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel1", @@ -1372,6 +1396,7 @@ public void testFlushServiceException() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, parameterMap); @@ -1408,6 +1433,7 @@ public void testGetLatestCommittedOffsetTokens() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestStageTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestStageTest.java index 1ba9f98df..d12c6231c 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestStageTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestStageTest.java @@ -276,7 +276,7 @@ public void testRefreshSnowflakeMetadataRemote() throws Exception { Mockito.when(mockResponse.getEntity()).thenReturn(entity); Mockito.when(mockClient.execute(Mockito.any())).thenReturn(mockResponse); - ParameterProvider parameterProvider = new ParameterProvider(); + ParameterProvider parameterProvider = new ParameterProvider(false); StreamingIngestStage stage = new StreamingIngestStage(true, "role", mockClient, mockBuilder, "clientName", 1); From 4cd1beabdef48c9ee058bf061f05cdc180e24173 Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Mon, 10 Jun 2024 17:32:05 -0700 Subject: [PATCH 2/2] Revert "SNOW-1437885 Disable blob interleaving when in Iceberg mode (#763)" This reverts commit c6dfbf31be807072e31bfe137f7c7dc7861fad20. --- ...SnowflakeStreamingIngestClientFactory.java | 16 +-- ...nowflakeStreamingIngestClientInternal.java | 16 +-- .../ingest/utils/ParameterProvider.java | 45 ++------- .../streaming/internal/ChannelCacheTest.java | 2 +- .../streaming/internal/FlushServiceTest.java | 20 +--- .../streaming/internal/OAuthBasicTest.java | 2 +- .../internal/ParameterProviderTest.java | 97 +++++-------------- .../internal/RegisterServiceTest.java | 15 +-- .../SnowflakeStreamingIngestChannelTest.java | 42 +++----- .../SnowflakeStreamingIngestClientTest.java | 34 +------ .../internal/StreamingIngestStageTest.java | 2 +- 11 files changed, 59 insertions(+), 232 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java b/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java index 89e528693..cd6d78787 100644 --- a/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java +++ b/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java @@ -28,10 +28,6 @@ public static class Builder { // Allows client to override some default parameter values private Map parameterOverrides; - // Indicates whether it's streaming to Iceberg tables. Open channels on regular tables should - // fail in this mode. - private boolean isIcebergMode; - // Indicates whether it's under test mode private boolean isTestMode; @@ -49,11 +45,6 @@ public Builder setParameterOverrides(Map parameterOverrides) { return this; } - public Builder setIsIceberg(boolean isIcebergMode) { - this.isIcebergMode = isIcebergMode; - return this; - } - public Builder setIsTestMode(boolean isTestMode) { this.isTestMode = isTestMode; return this; @@ -67,12 +58,7 @@ public SnowflakeStreamingIngestClient build() { SnowflakeURL accountURL = new SnowflakeURL(prop.getProperty(Constants.ACCOUNT_URL)); return new SnowflakeStreamingIngestClientInternal<>( - this.name, - accountURL, - prop, - this.parameterOverrides, - this.isIcebergMode, - this.isTestMode); + this.name, accountURL, prop, this.parameterOverrides, this.isTestMode); } } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index 13cc673ff..2990b49d8 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -121,9 +121,6 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea // Indicates whether the client has closed private volatile boolean isClosed; - // Indicates wheter the client is streaming to Iceberg tables - private final boolean isIcebergMode; - // Indicates whether the client is under test mode private final boolean isTestMode; @@ -155,7 +152,6 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea * @param accountURL Snowflake account url * @param prop connection properties * @param httpClient http client for sending request - * @param isIcebergMode whether we're streaming to iceberg tables * @param isTestMode whether we're under test mode * @param requestBuilder http request builder * @param parameterOverrides parameters we override in case we want to set different values @@ -165,15 +161,13 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea SnowflakeURL accountURL, Properties prop, CloseableHttpClient httpClient, - boolean isIcebergMode, boolean isTestMode, RequestBuilder requestBuilder, Map parameterOverrides) { - this.parameterProvider = new ParameterProvider(parameterOverrides, prop, isIcebergMode); + this.parameterProvider = new ParameterProvider(parameterOverrides, prop); this.name = name; String accountName = accountURL == null ? null : accountURL.getAccount(); - this.isIcebergMode = isIcebergMode; this.isTestMode = isTestMode; this.httpClient = httpClient == null ? HttpUtil.getHttpClient(accountName) : httpClient; this.channelCache = new ChannelCache<>(); @@ -257,7 +251,6 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea * @param accountURL Snowflake account url * @param prop connection properties * @param parameterOverrides map of parameters to override for this client - * @param isIcebergMode whether we're streaming to iceberg tables * @param isTestMode indicates whether it's under test mode */ public SnowflakeStreamingIngestClientInternal( @@ -265,17 +258,16 @@ public SnowflakeStreamingIngestClientInternal( SnowflakeURL accountURL, Properties prop, Map parameterOverrides, - boolean isIcebergMode, boolean isTestMode) { - this(name, accountURL, prop, null, isIcebergMode, isTestMode, null, parameterOverrides); + this(name, accountURL, prop, null, isTestMode, null, parameterOverrides); } /*** Constructor for TEST ONLY * * @param name the name of the client */ - SnowflakeStreamingIngestClientInternal(String name, boolean isIcebergMode) { - this(name, null, null, null, isIcebergMode, true, null, new HashMap<>()); + SnowflakeStreamingIngestClientInternal(String name) { + this(name, null, null, null, true, null, new HashMap<>()); } // TESTING ONLY - inject the request builder diff --git a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java index 33f791ca5..b98972a7d 100644 --- a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java +++ b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java @@ -1,7 +1,5 @@ package net.snowflake.ingest.utils; -import static net.snowflake.ingest.utils.ErrorCode.INVALID_CONFIG_PARAMETER; - import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -66,10 +64,6 @@ public class ParameterProvider { public static final Constants.BdecParquetCompression BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT = Constants.BdecParquetCompression.GZIP; - /* Iceberg mode parameters: When streaming to Iceberg mode, different default parameters are required because it generates Parquet files instead of BDEC files. */ - public static final int MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT = - 1; // 1 parquet file per blob - /* Parameter that enables using internal Parquet buffers for buffering of rows before serializing. It reduces memory consumption compared to using Java Objects for buffering.*/ public static final boolean ENABLE_PARQUET_INTERNAL_BUFFERING_DEFAULT = false; @@ -86,17 +80,14 @@ public class ParameterProvider { * * @param parameterOverrides Map of parameter name to value * @param props Properties from profile file - * @param isIcebergMode If the provided parameters need to be verified and modified to meet - * Iceberg mode */ - public ParameterProvider( - Map parameterOverrides, Properties props, boolean isIcebergMode) { - this.setParameterMap(parameterOverrides, props, isIcebergMode); + public ParameterProvider(Map parameterOverrides, Properties props) { + this.setParameterMap(parameterOverrides, props); } /** Empty constructor for tests */ - public ParameterProvider(boolean isIcebergMode) { - this(null, null, isIcebergMode); + public ParameterProvider() { + this(null, null); } private void updateValue( @@ -108,8 +99,6 @@ private void updateValue( this.parameterMap.put(key, parameterOverrides.getOrDefault(key, defaultValue)); } else if (props != null) { this.parameterMap.put(key, props.getOrDefault(key, defaultValue)); - } else { - this.parameterMap.put(key, defaultValue); } } @@ -118,11 +107,8 @@ private void updateValue( * * @param parameterOverrides Map of parameter name -> value * @param props Properties file provided to client constructor - * @param isIcebergMode If the provided parameters need to be verified and modified to meet - * Iceberg mode */ - private void setParameterMap( - Map parameterOverrides, Properties props, boolean isIcebergMode) { + private void setParameterMap(Map parameterOverrides, Properties props) { // BUFFER_FLUSH_INTERVAL_IN_MILLIS is deprecated and disallowed if ((parameterOverrides != null && parameterOverrides.containsKey(BUFFER_FLUSH_INTERVAL_IN_MILLIS)) @@ -193,9 +179,7 @@ private void setParameterMap( this.updateValue( MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST, - isIcebergMode - ? MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT - : MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT, + MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT, parameterOverrides, props); @@ -204,13 +188,6 @@ private void setParameterMap( BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT, parameterOverrides, props); - - // Required parameter override for Iceberg mode - if (isIcebergMode) { - icebergModeValidation( - MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST, - MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT); - } } /** @return Longest interval in milliseconds between buffer flushes */ @@ -434,14 +411,4 @@ public Constants.BdecParquetCompression getBdecParquetCompressionAlgorithm() { public String toString() { return "ParameterProvider{" + "parameterMap=" + parameterMap + '}'; } - - private void icebergModeValidation(String key, Object expected) { - Object val = this.parameterMap.get(key); - if (!val.equals(expected)) { - throw new SFException( - INVALID_CONFIG_PARAMETER, - String.format( - "The value %s for %s is invalid in Iceberg mode, should be %s.", val, key, expected)); - } - } } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java index 7c6d797b4..947908ef9 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java @@ -24,7 +24,7 @@ public class ChannelCacheTest { @Before public void setup() { cache = new ChannelCache<>(); - client = new SnowflakeStreamingIngestClientInternal<>("client", false); + client = new SnowflakeStreamingIngestClientInternal<>("client"); channel1 = new SnowflakeStreamingIngestChannelInternal<>( "channel1", diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index e66306d1f..f200c7177 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -7,7 +7,6 @@ import static net.snowflake.ingest.utils.Constants.BLOB_NO_HEADER; import static net.snowflake.ingest.utils.Constants.BLOB_TAG_SIZE_IN_BYTES; import static net.snowflake.ingest.utils.Constants.BLOB_VERSION_SIZE_IN_BYTES; -import static net.snowflake.ingest.utils.ParameterProvider.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT; import static net.snowflake.ingest.utils.ParameterProvider.MAX_CHUNK_SIZE_IN_BYTES_DEFAULT; import com.codahale.metrics.Histogram; @@ -49,22 +48,11 @@ import net.snowflake.ingest.utils.SFException; import org.junit.Assert; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatchers; import org.mockito.Mockito; -@RunWith(Parameterized.class) public class FlushServiceTest { - - @Parameterized.Parameters(name = "isIcebergMode: {0}") - public static Object[] isIcebergMode() { - return new Object[] {false, true}; - } - - @Parameterized.Parameter public static boolean isIcebergMode; - public FlushServiceTest() { this.testContextFactory = ParquetTestContext.createFactory(); } @@ -98,7 +86,7 @@ private abstract static class TestContext implements AutoCloseable { TestContext() { stage = Mockito.mock(StreamingIngestStage.class); Mockito.when(stage.getClientPrefix()).thenReturn("client_prefix"); - parameterProvider = new ParameterProvider(isIcebergMode); + parameterProvider = new ParameterProvider(); client = Mockito.mock(SnowflakeStreamingIngestClientInternal.class); Mockito.when(client.getParameterProvider()).thenReturn(parameterProvider); channelCache = new ChannelCache<>(); @@ -598,9 +586,7 @@ public void runTestBlobSplitDueToNumberOfChunks(int numberOfRows) throws Excepti Math.ceil( (double) numberOfRows / channelsPerTable - / (isIcebergMode - ? MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT - : ParameterProvider.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT)); + / ParameterProvider.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT); final TestContext>> testContext = testContextFactory.create(); @@ -875,7 +861,7 @@ public void testInvalidateChannels() { // Create a new Client in order to not interfere with other tests SnowflakeStreamingIngestClientInternal client = Mockito.mock(SnowflakeStreamingIngestClientInternal.class); - ParameterProvider parameterProvider = new ParameterProvider(isIcebergMode); + ParameterProvider parameterProvider = new ParameterProvider(); ChannelCache channelCache = new ChannelCache<>(); Mockito.when(client.getChannelCache()).thenReturn(channelCache); Mockito.when(client.getParameterProvider()).thenReturn(parameterProvider); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/OAuthBasicTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/OAuthBasicTest.java index 0c0c5bb85..6351f267d 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/OAuthBasicTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/OAuthBasicTest.java @@ -114,7 +114,7 @@ public void testCreateOAuthClient() throws Exception { @Test public void testSetRefreshToken() throws Exception { SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("TEST_CLIENT", false); + new SnowflakeStreamingIngestClientInternal<>("TEST_CLIENT"); MockOAuthClient mockOAuthClient = new MockOAuthClient(); OAuthManager oAuthManager = diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java index fb73aaa66..86cece9c7 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java @@ -1,16 +1,12 @@ package net.snowflake.ingest.streaming.internal; -import static net.snowflake.ingest.utils.ParameterProvider.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT; - import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import net.snowflake.ingest.utils.Constants; -import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.ParameterProvider; -import net.snowflake.ingest.utils.SFException; import org.junit.Assert; import org.junit.Test; @@ -35,7 +31,7 @@ private Map getStartingParameterMap() { public void withValuesSet() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); Assert.assertEquals(1000L, parameterProvider.getCachedMaxClientLagInMs()); Assert.assertEquals(4L, parameterProvider.getBufferFlushCheckIntervalInMs()); @@ -58,7 +54,7 @@ public void withNullProps() { parameterMap.put(ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS, 4L); parameterMap.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE, 6); parameterMap.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_BYTES, 1024); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, null, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, null); Assert.assertEquals(3000, parameterProvider.getCachedMaxClientLagInMs()); Assert.assertEquals(4, parameterProvider.getBufferFlushCheckIntervalInMs()); @@ -76,7 +72,7 @@ public void withNullParameterMap() { props.put(ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS, 4L); props.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE, 6); props.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_BYTES, 1024); - ParameterProvider parameterProvider = new ParameterProvider(null, props, false); + ParameterProvider parameterProvider = new ParameterProvider(null, props); Assert.assertEquals(3000, parameterProvider.getCachedMaxClientLagInMs()); Assert.assertEquals(4, parameterProvider.getBufferFlushCheckIntervalInMs()); @@ -89,7 +85,7 @@ public void withNullParameterMap() { @Test public void withNullInputs() { - ParameterProvider parameterProvider = new ParameterProvider(null, null, false); + ParameterProvider parameterProvider = new ParameterProvider(null, null); Assert.assertEquals( ParameterProvider.MAX_CLIENT_LAG_DEFAULT, parameterProvider.getCachedMaxClientLagInMs()); @@ -109,7 +105,7 @@ public void withNullInputs() { @Test public void withDefaultValues() { - ParameterProvider parameterProvider = new ParameterProvider(false); + ParameterProvider parameterProvider = new ParameterProvider(); Assert.assertEquals( ParameterProvider.MAX_CLIENT_LAG_DEFAULT, parameterProvider.getCachedMaxClientLagInMs()); @@ -141,49 +137,12 @@ public void withDefaultValues() { parameterProvider.getBdecParquetCompressionAlgorithm()); } - @Test - public void withIcebergDefaultValues() { - ParameterProvider parameterProvider = new ParameterProvider(true); - - Assert.assertEquals( - ParameterProvider.MAX_CLIENT_LAG_DEFAULT, parameterProvider.getCachedMaxClientLagInMs()); - Assert.assertEquals( - ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS_DEFAULT, - parameterProvider.getBufferFlushCheckIntervalInMs()); - Assert.assertEquals( - ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE_DEFAULT, - parameterProvider.getInsertThrottleThresholdInPercentage()); - Assert.assertEquals( - ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_BYTES_DEFAULT, - parameterProvider.getInsertThrottleThresholdInBytes()); - Assert.assertEquals( - ParameterProvider.INSERT_THROTTLE_INTERVAL_IN_MILLIS_DEFAULT, - parameterProvider.getInsertThrottleIntervalInMs()); - Assert.assertEquals( - ParameterProvider.IO_TIME_CPU_RATIO_DEFAULT, parameterProvider.getIOTimeCpuRatio()); - Assert.assertEquals( - ParameterProvider.BLOB_UPLOAD_MAX_RETRY_COUNT_DEFAULT, - parameterProvider.getBlobUploadMaxRetryCount()); - Assert.assertEquals( - ParameterProvider.MAX_MEMORY_LIMIT_IN_BYTES_DEFAULT, - parameterProvider.getMaxMemoryLimitInBytes()); - Assert.assertEquals( - ParameterProvider.MAX_CHANNEL_SIZE_IN_BYTES_DEFAULT, - parameterProvider.getMaxChannelSizeInBytes()); - Assert.assertEquals( - ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT, - parameterProvider.getBdecParquetCompressionAlgorithm()); - Assert.assertEquals( - MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT, - parameterProvider.getMaxChunksInBlobAndRegistrationRequest()); - } - @Test public void testMaxClientLagEnabled() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "2 second"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); Assert.assertEquals(2000, parameterProvider.getCachedMaxClientLagInMs()); // call again to trigger caching logic Assert.assertEquals(2000, parameterProvider.getCachedMaxClientLagInMs()); @@ -194,7 +153,7 @@ public void testMaxClientLagEnabledPluralTimeUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "2 seconds"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); Assert.assertEquals(2000, parameterProvider.getCachedMaxClientLagInMs()); } @@ -203,7 +162,7 @@ public void testMaxClientLagEnabledMinuteTimeUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "1 minute"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); Assert.assertEquals(60000, parameterProvider.getCachedMaxClientLagInMs()); } @@ -212,7 +171,7 @@ public void testMaxClientLagEnabledMinuteTimeUnitPluralTimeUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "2 minutes"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); Assert.assertEquals(120000, parameterProvider.getCachedMaxClientLagInMs()); } @@ -220,7 +179,7 @@ public void testMaxClientLagEnabledMinuteTimeUnitPluralTimeUnit() { public void testMaxClientLagEnabledDefaultValue() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); Assert.assertEquals( ParameterProvider.MAX_CLIENT_LAG_DEFAULT, parameterProvider.getCachedMaxClientLagInMs()); } @@ -230,7 +189,7 @@ public void testMaxClientLagEnabledDefaultUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "3000"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); Assert.assertEquals(3000, parameterProvider.getCachedMaxClientLagInMs()); } @@ -239,7 +198,7 @@ public void testMaxClientLagEnabledLongInput() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, 3000L); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); Assert.assertEquals(3000, parameterProvider.getCachedMaxClientLagInMs()); } @@ -248,7 +207,7 @@ public void testMaxClientLagEnabledMissingUnitTimeUnitSupplied() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, " year"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -262,7 +221,7 @@ public void testMaxClientLagEnabledInvalidTimeUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "1 year"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -276,7 +235,7 @@ public void testMaxClientLagEnabledInvalidUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "banana minute"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -290,7 +249,7 @@ public void testMaxClientLagEnabledThresholdBelow() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "0 second"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -304,7 +263,7 @@ public void testMaxClientLagEnabledThresholdAbove() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "11 minutes"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -318,7 +277,7 @@ public void testMaxClientLagEnableEmptyInput() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, ""); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -332,20 +291,8 @@ public void testMaxChunksInBlobAndRegistrationRequest() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put("max_chunks_in_blob_and_registration_request", 1); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); - Assert.assertEquals(1, parameterProvider.getMaxChunksInBlobAndRegistrationRequest()); - - parameterProvider = new ParameterProvider(parameterMap, prop, true); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); Assert.assertEquals(1, parameterProvider.getMaxChunksInBlobAndRegistrationRequest()); - - SFException e = - Assert.assertThrows( - SFException.class, - () -> { - parameterMap.put("max_chunks_in_blob_and_registration_request", 100); - new ParameterProvider(parameterMap, prop, true); - }); - Assert.assertEquals(e.getVendorCode(), ErrorCode.INVALID_CONFIG_PARAMETER.getMessageCode()); } @Test @@ -356,7 +303,7 @@ public void testValidCompressionAlgorithmsAndWithUppercaseLowerCase() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM, v); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); Assert.assertEquals( Constants.BdecParquetCompression.GZIP, parameterProvider.getBdecParquetCompressionAlgorithm()); @@ -367,7 +314,7 @@ public void testValidCompressionAlgorithmsAndWithUppercaseLowerCase() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM, v); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); Assert.assertEquals( Constants.BdecParquetCompression.ZSTD, parameterProvider.getBdecParquetCompressionAlgorithm()); @@ -379,7 +326,7 @@ public void testInvalidCompressionAlgorithm() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM, "invalid_comp"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); try { parameterProvider.getBdecParquetCompressionAlgorithm(); Assert.fail("Should not have succeeded"); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RegisterServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RegisterServiceTest.java index 4eaea15a4..37eb5f96e 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RegisterServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RegisterServiceTest.java @@ -14,17 +14,8 @@ import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -@RunWith(Parameterized.class) public class RegisterServiceTest { - @Parameterized.Parameters(name = "isIcebergMode: {0}") - public static Object[] isIcebergMode() { - return new Object[] {false, true}; - } - - @Parameterized.Parameter public boolean isIcebergMode; @Test public void testRegisterService() throws ExecutionException, InterruptedException { @@ -54,7 +45,7 @@ public void testRegisterService() throws ExecutionException, InterruptedExceptio @Test public void testRegisterServiceTimeoutException() throws Exception { SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); + new SnowflakeStreamingIngestClientInternal<>("client"); RegisterService rs = new RegisterService<>(client, true); Pair, CompletableFuture> blobFuture1 = @@ -82,7 +73,7 @@ public void testRegisterServiceTimeoutException() throws Exception { @Test public void testRegisterServiceTimeoutException_testRetries() throws Exception { SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); + new SnowflakeStreamingIngestClientInternal<>("client"); RegisterService rs = new RegisterService<>(client, true); Pair, CompletableFuture> blobFuture1 = @@ -116,7 +107,7 @@ public void testRegisterServiceTimeoutException_testRetries() throws Exception { @Test public void testRegisterServiceNonTimeoutException() { SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); + new SnowflakeStreamingIngestClientInternal<>("client"); RegisterService rs = new RegisterService<>(client, true); CompletableFuture future = new CompletableFuture<>(); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java index 5beb0662f..87e3f8f11 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java @@ -44,11 +44,8 @@ import net.snowflake.ingest.utils.Utils; import org.junit.Assert; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; import org.mockito.Mockito; -@RunWith(Parameterized.class) public class SnowflakeStreamingIngestChannelTest { /** @@ -75,13 +72,6 @@ public long getFreeMemory() { } } - @Parameterized.Parameters(name = "isIcebergMode: {0}") - public static Object[] isIcebergMode() { - return new Object[] {false, true}; - } - - @Parameterized.Parameter public boolean isIcebergMode; - @Test public void testChannelFactoryNullFields() { String name = "CHANNEL"; @@ -91,7 +81,7 @@ public void testChannelFactoryNullFields() { long channelSequencer = 0L; long rowSequencer = 0L; SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); + new SnowflakeStreamingIngestClientInternal<>("client"); Object[] fields = new Object[] { @@ -133,7 +123,7 @@ public void testChannelFactorySuccess() { long rowSequencer = 0L; SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); + new SnowflakeStreamingIngestClientInternal<>("client"); SnowflakeStreamingIngestChannelInternal channel = SnowflakeStreamingIngestChannelFactory.builder(name) @@ -168,7 +158,7 @@ public void testChannelFactorySuccess() { @Test public void testChannelValid() { SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); + new SnowflakeStreamingIngestClientInternal<>("client"); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -218,7 +208,7 @@ public void testChannelValid() { @Test public void testChannelClose() { SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); + new SnowflakeStreamingIngestClientInternal<>("client"); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -357,7 +347,6 @@ public void testOpenChannelErrorResponse() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, - isIcebergMode, true, requestBuilder, null); @@ -428,7 +417,6 @@ public void testOpenChannelSnowflakeInternalErrorResponse() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, - isIcebergMode, true, requestBuilder, null); @@ -511,7 +499,6 @@ public void testOpenChannelSuccessResponse() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, - isIcebergMode, true, requestBuilder, null); @@ -550,9 +537,7 @@ public void testOpenChannelSuccessResponse() throws Exception { @Test public void testInsertRow() { SnowflakeStreamingIngestClientInternal client; - client = - new SnowflakeStreamingIngestClientInternal( - "client_PARQUET", isIcebergMode); + client = new SnowflakeStreamingIngestClientInternal("client_PARQUET"); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -636,8 +621,7 @@ public void testInsertTooLargeRow() { schema.forEach(x -> row.put(x.getName(), byteArrayOneMb)); SnowflakeStreamingIngestClientInternal client; - client = - new SnowflakeStreamingIngestClientInternal("test_client", isIcebergMode); + client = new SnowflakeStreamingIngestClientInternal("test_client"); // Test channel with on error CONTINUE SnowflakeStreamingIngestChannelInternal channel = @@ -721,7 +705,7 @@ public void testInsertRowThrottling() { memoryInfoProvider.maxMemory = maxMemory; SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); + new SnowflakeStreamingIngestClientInternal<>("client"); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -737,7 +721,7 @@ public void testInsertRowThrottling() { OpenChannelRequest.OnErrorOption.CONTINUE, UTC); - ParameterProvider parameterProvider = new ParameterProvider(isIcebergMode); + ParameterProvider parameterProvider = new ParameterProvider(); memoryInfoProvider.freeMemory = maxMemory * (parameterProvider.getInsertThrottleThresholdInPercentage() - 1) / 100; @@ -767,7 +751,7 @@ public void testInsertRowThrottling() { @Test public void testFlush() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -803,7 +787,7 @@ public void testFlush() throws Exception { @Test public void testClose() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); SnowflakeStreamingIngestChannel channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -837,7 +821,7 @@ public void testClose() throws Exception { @Test public void testDropOnClose() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -874,7 +858,7 @@ public void testDropOnClose() throws Exception { @Test public void testDropOnCloseInvalidChannel() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -907,7 +891,7 @@ public void testDropOnCloseInvalidChannel() throws Exception { public void testGetLatestCommittedOffsetToken() { String offsetToken = "10"; SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); SnowflakeStreamingIngestChannel channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java index 4e66d8a15..1693e1520 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java @@ -66,12 +66,9 @@ import org.junit.Before; import org.junit.Ignore; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; import org.mockito.ArgumentMatchers; import org.mockito.Mockito; -@RunWith(Parameterized.class) public class SnowflakeStreamingIngestClientTest { private static final ObjectMapper objectMapper = new ObjectMapper(); @@ -82,14 +79,6 @@ public class SnowflakeStreamingIngestClientTest { SnowflakeStreamingIngestChannelInternal channel3; SnowflakeStreamingIngestChannelInternal channel4; - // TODO: Add IcebergMode = True after Streaming to Iceberg is supported. - @Parameterized.Parameters(name = "isIcebergMode: {0}") - public static Object[] isIcebergMode() { - return new Object[] {false}; - } - - @Parameterized.Parameter public boolean isIcebergMode; - @Before public void setup() { objectMapper.setVisibility(PropertyAccessor.GETTER, JsonAutoDetect.Visibility.ANY); @@ -177,7 +166,6 @@ public void testConstructorParameters() throws Exception { SnowflakeStreamingIngestClientFactory.builder("client") .setProperties(prop) .setParameterOverrides(parameterMap) - .setIsIceberg(isIcebergMode) .setIsTestMode(true) .build(); @@ -205,7 +193,6 @@ public void testClientFactoryWithJmxMetrics() throws Exception { .setProperties(prop) .setParameterOverrides( Collections.singletonMap(ENABLE_SNOWPIPE_STREAMING_METRICS, true)) - .setIsIceberg(isIcebergMode) .build(); Assert.assertEquals("client", client.getName()); @@ -357,7 +344,6 @@ public void testGetChannelsStatusWithRequest() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, - isIcebergMode, true, requestBuilder, null); @@ -418,7 +404,6 @@ public void testDropChannel() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, - isIcebergMode, true, requestBuilder, null); @@ -464,7 +449,6 @@ public void testGetChannelsStatusWithRequestError() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, - isIcebergMode, true, requestBuilder, null); @@ -693,7 +677,6 @@ public void testGetRetryBlobs() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, - isIcebergMode, true, requestBuilder, null); @@ -735,7 +718,6 @@ public void testRegisterBlobErrorResponse() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, - isIcebergMode, true, requestBuilder, null); @@ -783,7 +765,6 @@ public void testRegisterBlobSnowflakeInternalErrorResponse() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, - isIcebergMode, true, requestBuilder, null); @@ -840,7 +821,6 @@ public void testRegisterBlobSuccessResponse() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, - isIcebergMode, true, requestBuilder, null); @@ -925,7 +905,6 @@ public void testRegisterBlobsRetries() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, - isIcebergMode, true, requestBuilder, null); @@ -955,7 +934,6 @@ public void testRegisterBlobChunkLimit() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, - isIcebergMode, true, requestBuilder, null)); @@ -1128,7 +1106,6 @@ public void testRegisterBlobsRetriesSucceeds() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, - isIcebergMode, true, requestBuilder, null); @@ -1203,7 +1180,6 @@ public void testRegisterBlobResponseWithInvalidChannel() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, - isIcebergMode, true, requestBuilder, null); @@ -1254,7 +1230,7 @@ public void testRegisterBlobResponseWithInvalidChannel() throws Exception { @Test public void testFlush() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); ChannelsStatusResponse response = new ChannelsStatusResponse(); response.setStatusCode(0L); response.setMessage("Success"); @@ -1276,7 +1252,7 @@ public void testFlush() throws Exception { @Test public void testClose() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); ChannelsStatusResponse response = new ChannelsStatusResponse(); response.setStatusCode(0L); response.setMessage("Success"); @@ -1310,7 +1286,7 @@ public void testClose() throws Exception { @Test public void testCloseWithError() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); CompletableFuture future = new CompletableFuture<>(); future.completeExceptionally(new Exception("Simulating Error")); @@ -1348,7 +1324,7 @@ public void testCloseWithError() throws Exception { @Test public void testVerifyChannelsAreFullyCommittedSuccess() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel1", @@ -1396,7 +1372,6 @@ public void testFlushServiceException() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, - isIcebergMode, true, requestBuilder, parameterMap); @@ -1433,7 +1408,6 @@ public void testGetLatestCommittedOffsetTokens() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, - isIcebergMode, true, requestBuilder, null); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestStageTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestStageTest.java index d12c6231c..1ba9f98df 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestStageTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestStageTest.java @@ -276,7 +276,7 @@ public void testRefreshSnowflakeMetadataRemote() throws Exception { Mockito.when(mockResponse.getEntity()).thenReturn(entity); Mockito.when(mockClient.execute(Mockito.any())).thenReturn(mockResponse); - ParameterProvider parameterProvider = new ParameterProvider(false); + ParameterProvider parameterProvider = new ParameterProvider(); StreamingIngestStage stage = new StreamingIngestStage(true, "role", mockClient, mockBuilder, "clientName", 1);