From 4cd1beabdef48c9ee058bf061f05cdc180e24173 Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Mon, 10 Jun 2024 17:32:05 -0700 Subject: [PATCH] Revert "SNOW-1437885 Disable blob interleaving when in Iceberg mode (#763)" This reverts commit c6dfbf31be807072e31bfe137f7c7dc7861fad20. --- ...SnowflakeStreamingIngestClientFactory.java | 16 +-- ...nowflakeStreamingIngestClientInternal.java | 16 +-- .../ingest/utils/ParameterProvider.java | 45 ++------- .../streaming/internal/ChannelCacheTest.java | 2 +- .../streaming/internal/FlushServiceTest.java | 20 +--- .../streaming/internal/OAuthBasicTest.java | 2 +- .../internal/ParameterProviderTest.java | 97 +++++-------------- .../internal/RegisterServiceTest.java | 15 +-- .../SnowflakeStreamingIngestChannelTest.java | 42 +++----- .../SnowflakeStreamingIngestClientTest.java | 34 +------ .../internal/StreamingIngestStageTest.java | 2 +- 11 files changed, 59 insertions(+), 232 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java b/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java index 89e528693..cd6d78787 100644 --- a/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java +++ b/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java @@ -28,10 +28,6 @@ public static class Builder { // Allows client to override some default parameter values private Map parameterOverrides; - // Indicates whether it's streaming to Iceberg tables. Open channels on regular tables should - // fail in this mode. - private boolean isIcebergMode; - // Indicates whether it's under test mode private boolean isTestMode; @@ -49,11 +45,6 @@ public Builder setParameterOverrides(Map parameterOverrides) { return this; } - public Builder setIsIceberg(boolean isIcebergMode) { - this.isIcebergMode = isIcebergMode; - return this; - } - public Builder setIsTestMode(boolean isTestMode) { this.isTestMode = isTestMode; return this; @@ -67,12 +58,7 @@ public SnowflakeStreamingIngestClient build() { SnowflakeURL accountURL = new SnowflakeURL(prop.getProperty(Constants.ACCOUNT_URL)); return new SnowflakeStreamingIngestClientInternal<>( - this.name, - accountURL, - prop, - this.parameterOverrides, - this.isIcebergMode, - this.isTestMode); + this.name, accountURL, prop, this.parameterOverrides, this.isTestMode); } } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index 13cc673ff..2990b49d8 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -121,9 +121,6 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea // Indicates whether the client has closed private volatile boolean isClosed; - // Indicates wheter the client is streaming to Iceberg tables - private final boolean isIcebergMode; - // Indicates whether the client is under test mode private final boolean isTestMode; @@ -155,7 +152,6 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea * @param accountURL Snowflake account url * @param prop connection properties * @param httpClient http client for sending request - * @param isIcebergMode whether we're streaming to iceberg tables * @param isTestMode whether we're under test mode * @param requestBuilder http request builder * @param parameterOverrides parameters we override in case we want to set different values @@ -165,15 +161,13 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea SnowflakeURL accountURL, Properties prop, CloseableHttpClient httpClient, - boolean isIcebergMode, boolean isTestMode, RequestBuilder requestBuilder, Map parameterOverrides) { - this.parameterProvider = new ParameterProvider(parameterOverrides, prop, isIcebergMode); + this.parameterProvider = new ParameterProvider(parameterOverrides, prop); this.name = name; String accountName = accountURL == null ? null : accountURL.getAccount(); - this.isIcebergMode = isIcebergMode; this.isTestMode = isTestMode; this.httpClient = httpClient == null ? HttpUtil.getHttpClient(accountName) : httpClient; this.channelCache = new ChannelCache<>(); @@ -257,7 +251,6 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea * @param accountURL Snowflake account url * @param prop connection properties * @param parameterOverrides map of parameters to override for this client - * @param isIcebergMode whether we're streaming to iceberg tables * @param isTestMode indicates whether it's under test mode */ public SnowflakeStreamingIngestClientInternal( @@ -265,17 +258,16 @@ public SnowflakeStreamingIngestClientInternal( SnowflakeURL accountURL, Properties prop, Map parameterOverrides, - boolean isIcebergMode, boolean isTestMode) { - this(name, accountURL, prop, null, isIcebergMode, isTestMode, null, parameterOverrides); + this(name, accountURL, prop, null, isTestMode, null, parameterOverrides); } /*** Constructor for TEST ONLY * * @param name the name of the client */ - SnowflakeStreamingIngestClientInternal(String name, boolean isIcebergMode) { - this(name, null, null, null, isIcebergMode, true, null, new HashMap<>()); + SnowflakeStreamingIngestClientInternal(String name) { + this(name, null, null, null, true, null, new HashMap<>()); } // TESTING ONLY - inject the request builder diff --git a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java index 33f791ca5..b98972a7d 100644 --- a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java +++ b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java @@ -1,7 +1,5 @@ package net.snowflake.ingest.utils; -import static net.snowflake.ingest.utils.ErrorCode.INVALID_CONFIG_PARAMETER; - import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -66,10 +64,6 @@ public class ParameterProvider { public static final Constants.BdecParquetCompression BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT = Constants.BdecParquetCompression.GZIP; - /* Iceberg mode parameters: When streaming to Iceberg mode, different default parameters are required because it generates Parquet files instead of BDEC files. */ - public static final int MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT = - 1; // 1 parquet file per blob - /* Parameter that enables using internal Parquet buffers for buffering of rows before serializing. It reduces memory consumption compared to using Java Objects for buffering.*/ public static final boolean ENABLE_PARQUET_INTERNAL_BUFFERING_DEFAULT = false; @@ -86,17 +80,14 @@ public class ParameterProvider { * * @param parameterOverrides Map of parameter name to value * @param props Properties from profile file - * @param isIcebergMode If the provided parameters need to be verified and modified to meet - * Iceberg mode */ - public ParameterProvider( - Map parameterOverrides, Properties props, boolean isIcebergMode) { - this.setParameterMap(parameterOverrides, props, isIcebergMode); + public ParameterProvider(Map parameterOverrides, Properties props) { + this.setParameterMap(parameterOverrides, props); } /** Empty constructor for tests */ - public ParameterProvider(boolean isIcebergMode) { - this(null, null, isIcebergMode); + public ParameterProvider() { + this(null, null); } private void updateValue( @@ -108,8 +99,6 @@ private void updateValue( this.parameterMap.put(key, parameterOverrides.getOrDefault(key, defaultValue)); } else if (props != null) { this.parameterMap.put(key, props.getOrDefault(key, defaultValue)); - } else { - this.parameterMap.put(key, defaultValue); } } @@ -118,11 +107,8 @@ private void updateValue( * * @param parameterOverrides Map of parameter name -> value * @param props Properties file provided to client constructor - * @param isIcebergMode If the provided parameters need to be verified and modified to meet - * Iceberg mode */ - private void setParameterMap( - Map parameterOverrides, Properties props, boolean isIcebergMode) { + private void setParameterMap(Map parameterOverrides, Properties props) { // BUFFER_FLUSH_INTERVAL_IN_MILLIS is deprecated and disallowed if ((parameterOverrides != null && parameterOverrides.containsKey(BUFFER_FLUSH_INTERVAL_IN_MILLIS)) @@ -193,9 +179,7 @@ private void setParameterMap( this.updateValue( MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST, - isIcebergMode - ? MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT - : MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT, + MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT, parameterOverrides, props); @@ -204,13 +188,6 @@ private void setParameterMap( BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT, parameterOverrides, props); - - // Required parameter override for Iceberg mode - if (isIcebergMode) { - icebergModeValidation( - MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST, - MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT); - } } /** @return Longest interval in milliseconds between buffer flushes */ @@ -434,14 +411,4 @@ public Constants.BdecParquetCompression getBdecParquetCompressionAlgorithm() { public String toString() { return "ParameterProvider{" + "parameterMap=" + parameterMap + '}'; } - - private void icebergModeValidation(String key, Object expected) { - Object val = this.parameterMap.get(key); - if (!val.equals(expected)) { - throw new SFException( - INVALID_CONFIG_PARAMETER, - String.format( - "The value %s for %s is invalid in Iceberg mode, should be %s.", val, key, expected)); - } - } } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java index 7c6d797b4..947908ef9 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java @@ -24,7 +24,7 @@ public class ChannelCacheTest { @Before public void setup() { cache = new ChannelCache<>(); - client = new SnowflakeStreamingIngestClientInternal<>("client", false); + client = new SnowflakeStreamingIngestClientInternal<>("client"); channel1 = new SnowflakeStreamingIngestChannelInternal<>( "channel1", diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index e66306d1f..f200c7177 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -7,7 +7,6 @@ import static net.snowflake.ingest.utils.Constants.BLOB_NO_HEADER; import static net.snowflake.ingest.utils.Constants.BLOB_TAG_SIZE_IN_BYTES; import static net.snowflake.ingest.utils.Constants.BLOB_VERSION_SIZE_IN_BYTES; -import static net.snowflake.ingest.utils.ParameterProvider.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT; import static net.snowflake.ingest.utils.ParameterProvider.MAX_CHUNK_SIZE_IN_BYTES_DEFAULT; import com.codahale.metrics.Histogram; @@ -49,22 +48,11 @@ import net.snowflake.ingest.utils.SFException; import org.junit.Assert; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatchers; import org.mockito.Mockito; -@RunWith(Parameterized.class) public class FlushServiceTest { - - @Parameterized.Parameters(name = "isIcebergMode: {0}") - public static Object[] isIcebergMode() { - return new Object[] {false, true}; - } - - @Parameterized.Parameter public static boolean isIcebergMode; - public FlushServiceTest() { this.testContextFactory = ParquetTestContext.createFactory(); } @@ -98,7 +86,7 @@ private abstract static class TestContext implements AutoCloseable { TestContext() { stage = Mockito.mock(StreamingIngestStage.class); Mockito.when(stage.getClientPrefix()).thenReturn("client_prefix"); - parameterProvider = new ParameterProvider(isIcebergMode); + parameterProvider = new ParameterProvider(); client = Mockito.mock(SnowflakeStreamingIngestClientInternal.class); Mockito.when(client.getParameterProvider()).thenReturn(parameterProvider); channelCache = new ChannelCache<>(); @@ -598,9 +586,7 @@ public void runTestBlobSplitDueToNumberOfChunks(int numberOfRows) throws Excepti Math.ceil( (double) numberOfRows / channelsPerTable - / (isIcebergMode - ? MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT - : ParameterProvider.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT)); + / ParameterProvider.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT); final TestContext>> testContext = testContextFactory.create(); @@ -875,7 +861,7 @@ public void testInvalidateChannels() { // Create a new Client in order to not interfere with other tests SnowflakeStreamingIngestClientInternal client = Mockito.mock(SnowflakeStreamingIngestClientInternal.class); - ParameterProvider parameterProvider = new ParameterProvider(isIcebergMode); + ParameterProvider parameterProvider = new ParameterProvider(); ChannelCache channelCache = new ChannelCache<>(); Mockito.when(client.getChannelCache()).thenReturn(channelCache); Mockito.when(client.getParameterProvider()).thenReturn(parameterProvider); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/OAuthBasicTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/OAuthBasicTest.java index 0c0c5bb85..6351f267d 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/OAuthBasicTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/OAuthBasicTest.java @@ -114,7 +114,7 @@ public void testCreateOAuthClient() throws Exception { @Test public void testSetRefreshToken() throws Exception { SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("TEST_CLIENT", false); + new SnowflakeStreamingIngestClientInternal<>("TEST_CLIENT"); MockOAuthClient mockOAuthClient = new MockOAuthClient(); OAuthManager oAuthManager = diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java index fb73aaa66..86cece9c7 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java @@ -1,16 +1,12 @@ package net.snowflake.ingest.streaming.internal; -import static net.snowflake.ingest.utils.ParameterProvider.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT; - import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import net.snowflake.ingest.utils.Constants; -import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.ParameterProvider; -import net.snowflake.ingest.utils.SFException; import org.junit.Assert; import org.junit.Test; @@ -35,7 +31,7 @@ private Map getStartingParameterMap() { public void withValuesSet() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); Assert.assertEquals(1000L, parameterProvider.getCachedMaxClientLagInMs()); Assert.assertEquals(4L, parameterProvider.getBufferFlushCheckIntervalInMs()); @@ -58,7 +54,7 @@ public void withNullProps() { parameterMap.put(ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS, 4L); parameterMap.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE, 6); parameterMap.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_BYTES, 1024); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, null, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, null); Assert.assertEquals(3000, parameterProvider.getCachedMaxClientLagInMs()); Assert.assertEquals(4, parameterProvider.getBufferFlushCheckIntervalInMs()); @@ -76,7 +72,7 @@ public void withNullParameterMap() { props.put(ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS, 4L); props.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE, 6); props.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_BYTES, 1024); - ParameterProvider parameterProvider = new ParameterProvider(null, props, false); + ParameterProvider parameterProvider = new ParameterProvider(null, props); Assert.assertEquals(3000, parameterProvider.getCachedMaxClientLagInMs()); Assert.assertEquals(4, parameterProvider.getBufferFlushCheckIntervalInMs()); @@ -89,7 +85,7 @@ public void withNullParameterMap() { @Test public void withNullInputs() { - ParameterProvider parameterProvider = new ParameterProvider(null, null, false); + ParameterProvider parameterProvider = new ParameterProvider(null, null); Assert.assertEquals( ParameterProvider.MAX_CLIENT_LAG_DEFAULT, parameterProvider.getCachedMaxClientLagInMs()); @@ -109,7 +105,7 @@ public void withNullInputs() { @Test public void withDefaultValues() { - ParameterProvider parameterProvider = new ParameterProvider(false); + ParameterProvider parameterProvider = new ParameterProvider(); Assert.assertEquals( ParameterProvider.MAX_CLIENT_LAG_DEFAULT, parameterProvider.getCachedMaxClientLagInMs()); @@ -141,49 +137,12 @@ public void withDefaultValues() { parameterProvider.getBdecParquetCompressionAlgorithm()); } - @Test - public void withIcebergDefaultValues() { - ParameterProvider parameterProvider = new ParameterProvider(true); - - Assert.assertEquals( - ParameterProvider.MAX_CLIENT_LAG_DEFAULT, parameterProvider.getCachedMaxClientLagInMs()); - Assert.assertEquals( - ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS_DEFAULT, - parameterProvider.getBufferFlushCheckIntervalInMs()); - Assert.assertEquals( - ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE_DEFAULT, - parameterProvider.getInsertThrottleThresholdInPercentage()); - Assert.assertEquals( - ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_BYTES_DEFAULT, - parameterProvider.getInsertThrottleThresholdInBytes()); - Assert.assertEquals( - ParameterProvider.INSERT_THROTTLE_INTERVAL_IN_MILLIS_DEFAULT, - parameterProvider.getInsertThrottleIntervalInMs()); - Assert.assertEquals( - ParameterProvider.IO_TIME_CPU_RATIO_DEFAULT, parameterProvider.getIOTimeCpuRatio()); - Assert.assertEquals( - ParameterProvider.BLOB_UPLOAD_MAX_RETRY_COUNT_DEFAULT, - parameterProvider.getBlobUploadMaxRetryCount()); - Assert.assertEquals( - ParameterProvider.MAX_MEMORY_LIMIT_IN_BYTES_DEFAULT, - parameterProvider.getMaxMemoryLimitInBytes()); - Assert.assertEquals( - ParameterProvider.MAX_CHANNEL_SIZE_IN_BYTES_DEFAULT, - parameterProvider.getMaxChannelSizeInBytes()); - Assert.assertEquals( - ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT, - parameterProvider.getBdecParquetCompressionAlgorithm()); - Assert.assertEquals( - MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT, - parameterProvider.getMaxChunksInBlobAndRegistrationRequest()); - } - @Test public void testMaxClientLagEnabled() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "2 second"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); Assert.assertEquals(2000, parameterProvider.getCachedMaxClientLagInMs()); // call again to trigger caching logic Assert.assertEquals(2000, parameterProvider.getCachedMaxClientLagInMs()); @@ -194,7 +153,7 @@ public void testMaxClientLagEnabledPluralTimeUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "2 seconds"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); Assert.assertEquals(2000, parameterProvider.getCachedMaxClientLagInMs()); } @@ -203,7 +162,7 @@ public void testMaxClientLagEnabledMinuteTimeUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "1 minute"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); Assert.assertEquals(60000, parameterProvider.getCachedMaxClientLagInMs()); } @@ -212,7 +171,7 @@ public void testMaxClientLagEnabledMinuteTimeUnitPluralTimeUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "2 minutes"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); Assert.assertEquals(120000, parameterProvider.getCachedMaxClientLagInMs()); } @@ -220,7 +179,7 @@ public void testMaxClientLagEnabledMinuteTimeUnitPluralTimeUnit() { public void testMaxClientLagEnabledDefaultValue() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); Assert.assertEquals( ParameterProvider.MAX_CLIENT_LAG_DEFAULT, parameterProvider.getCachedMaxClientLagInMs()); } @@ -230,7 +189,7 @@ public void testMaxClientLagEnabledDefaultUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "3000"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); Assert.assertEquals(3000, parameterProvider.getCachedMaxClientLagInMs()); } @@ -239,7 +198,7 @@ public void testMaxClientLagEnabledLongInput() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, 3000L); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); Assert.assertEquals(3000, parameterProvider.getCachedMaxClientLagInMs()); } @@ -248,7 +207,7 @@ public void testMaxClientLagEnabledMissingUnitTimeUnitSupplied() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, " year"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -262,7 +221,7 @@ public void testMaxClientLagEnabledInvalidTimeUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "1 year"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -276,7 +235,7 @@ public void testMaxClientLagEnabledInvalidUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "banana minute"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -290,7 +249,7 @@ public void testMaxClientLagEnabledThresholdBelow() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "0 second"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -304,7 +263,7 @@ public void testMaxClientLagEnabledThresholdAbove() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "11 minutes"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -318,7 +277,7 @@ public void testMaxClientLagEnableEmptyInput() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, ""); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -332,20 +291,8 @@ public void testMaxChunksInBlobAndRegistrationRequest() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put("max_chunks_in_blob_and_registration_request", 1); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); - Assert.assertEquals(1, parameterProvider.getMaxChunksInBlobAndRegistrationRequest()); - - parameterProvider = new ParameterProvider(parameterMap, prop, true); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); Assert.assertEquals(1, parameterProvider.getMaxChunksInBlobAndRegistrationRequest()); - - SFException e = - Assert.assertThrows( - SFException.class, - () -> { - parameterMap.put("max_chunks_in_blob_and_registration_request", 100); - new ParameterProvider(parameterMap, prop, true); - }); - Assert.assertEquals(e.getVendorCode(), ErrorCode.INVALID_CONFIG_PARAMETER.getMessageCode()); } @Test @@ -356,7 +303,7 @@ public void testValidCompressionAlgorithmsAndWithUppercaseLowerCase() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM, v); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); Assert.assertEquals( Constants.BdecParquetCompression.GZIP, parameterProvider.getBdecParquetCompressionAlgorithm()); @@ -367,7 +314,7 @@ public void testValidCompressionAlgorithmsAndWithUppercaseLowerCase() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM, v); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); Assert.assertEquals( Constants.BdecParquetCompression.ZSTD, parameterProvider.getBdecParquetCompressionAlgorithm()); @@ -379,7 +326,7 @@ public void testInvalidCompressionAlgorithm() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM, "invalid_comp"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); try { parameterProvider.getBdecParquetCompressionAlgorithm(); Assert.fail("Should not have succeeded"); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RegisterServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RegisterServiceTest.java index 4eaea15a4..37eb5f96e 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RegisterServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RegisterServiceTest.java @@ -14,17 +14,8 @@ import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -@RunWith(Parameterized.class) public class RegisterServiceTest { - @Parameterized.Parameters(name = "isIcebergMode: {0}") - public static Object[] isIcebergMode() { - return new Object[] {false, true}; - } - - @Parameterized.Parameter public boolean isIcebergMode; @Test public void testRegisterService() throws ExecutionException, InterruptedException { @@ -54,7 +45,7 @@ public void testRegisterService() throws ExecutionException, InterruptedExceptio @Test public void testRegisterServiceTimeoutException() throws Exception { SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); + new SnowflakeStreamingIngestClientInternal<>("client"); RegisterService rs = new RegisterService<>(client, true); Pair, CompletableFuture> blobFuture1 = @@ -82,7 +73,7 @@ public void testRegisterServiceTimeoutException() throws Exception { @Test public void testRegisterServiceTimeoutException_testRetries() throws Exception { SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); + new SnowflakeStreamingIngestClientInternal<>("client"); RegisterService rs = new RegisterService<>(client, true); Pair, CompletableFuture> blobFuture1 = @@ -116,7 +107,7 @@ public void testRegisterServiceTimeoutException_testRetries() throws Exception { @Test public void testRegisterServiceNonTimeoutException() { SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); + new SnowflakeStreamingIngestClientInternal<>("client"); RegisterService rs = new RegisterService<>(client, true); CompletableFuture future = new CompletableFuture<>(); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java index 5beb0662f..87e3f8f11 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java @@ -44,11 +44,8 @@ import net.snowflake.ingest.utils.Utils; import org.junit.Assert; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; import org.mockito.Mockito; -@RunWith(Parameterized.class) public class SnowflakeStreamingIngestChannelTest { /** @@ -75,13 +72,6 @@ public long getFreeMemory() { } } - @Parameterized.Parameters(name = "isIcebergMode: {0}") - public static Object[] isIcebergMode() { - return new Object[] {false, true}; - } - - @Parameterized.Parameter public boolean isIcebergMode; - @Test public void testChannelFactoryNullFields() { String name = "CHANNEL"; @@ -91,7 +81,7 @@ public void testChannelFactoryNullFields() { long channelSequencer = 0L; long rowSequencer = 0L; SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); + new SnowflakeStreamingIngestClientInternal<>("client"); Object[] fields = new Object[] { @@ -133,7 +123,7 @@ public void testChannelFactorySuccess() { long rowSequencer = 0L; SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); + new SnowflakeStreamingIngestClientInternal<>("client"); SnowflakeStreamingIngestChannelInternal channel = SnowflakeStreamingIngestChannelFactory.builder(name) @@ -168,7 +158,7 @@ public void testChannelFactorySuccess() { @Test public void testChannelValid() { SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); + new SnowflakeStreamingIngestClientInternal<>("client"); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -218,7 +208,7 @@ public void testChannelValid() { @Test public void testChannelClose() { SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); + new SnowflakeStreamingIngestClientInternal<>("client"); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -357,7 +347,6 @@ public void testOpenChannelErrorResponse() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, - isIcebergMode, true, requestBuilder, null); @@ -428,7 +417,6 @@ public void testOpenChannelSnowflakeInternalErrorResponse() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, - isIcebergMode, true, requestBuilder, null); @@ -511,7 +499,6 @@ public void testOpenChannelSuccessResponse() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, - isIcebergMode, true, requestBuilder, null); @@ -550,9 +537,7 @@ public void testOpenChannelSuccessResponse() throws Exception { @Test public void testInsertRow() { SnowflakeStreamingIngestClientInternal client; - client = - new SnowflakeStreamingIngestClientInternal( - "client_PARQUET", isIcebergMode); + client = new SnowflakeStreamingIngestClientInternal("client_PARQUET"); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -636,8 +621,7 @@ public void testInsertTooLargeRow() { schema.forEach(x -> row.put(x.getName(), byteArrayOneMb)); SnowflakeStreamingIngestClientInternal client; - client = - new SnowflakeStreamingIngestClientInternal("test_client", isIcebergMode); + client = new SnowflakeStreamingIngestClientInternal("test_client"); // Test channel with on error CONTINUE SnowflakeStreamingIngestChannelInternal channel = @@ -721,7 +705,7 @@ public void testInsertRowThrottling() { memoryInfoProvider.maxMemory = maxMemory; SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); + new SnowflakeStreamingIngestClientInternal<>("client"); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -737,7 +721,7 @@ public void testInsertRowThrottling() { OpenChannelRequest.OnErrorOption.CONTINUE, UTC); - ParameterProvider parameterProvider = new ParameterProvider(isIcebergMode); + ParameterProvider parameterProvider = new ParameterProvider(); memoryInfoProvider.freeMemory = maxMemory * (parameterProvider.getInsertThrottleThresholdInPercentage() - 1) / 100; @@ -767,7 +751,7 @@ public void testInsertRowThrottling() { @Test public void testFlush() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -803,7 +787,7 @@ public void testFlush() throws Exception { @Test public void testClose() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); SnowflakeStreamingIngestChannel channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -837,7 +821,7 @@ public void testClose() throws Exception { @Test public void testDropOnClose() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -874,7 +858,7 @@ public void testDropOnClose() throws Exception { @Test public void testDropOnCloseInvalidChannel() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -907,7 +891,7 @@ public void testDropOnCloseInvalidChannel() throws Exception { public void testGetLatestCommittedOffsetToken() { String offsetToken = "10"; SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); SnowflakeStreamingIngestChannel channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java index 4e66d8a15..1693e1520 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java @@ -66,12 +66,9 @@ import org.junit.Before; import org.junit.Ignore; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; import org.mockito.ArgumentMatchers; import org.mockito.Mockito; -@RunWith(Parameterized.class) public class SnowflakeStreamingIngestClientTest { private static final ObjectMapper objectMapper = new ObjectMapper(); @@ -82,14 +79,6 @@ public class SnowflakeStreamingIngestClientTest { SnowflakeStreamingIngestChannelInternal channel3; SnowflakeStreamingIngestChannelInternal channel4; - // TODO: Add IcebergMode = True after Streaming to Iceberg is supported. - @Parameterized.Parameters(name = "isIcebergMode: {0}") - public static Object[] isIcebergMode() { - return new Object[] {false}; - } - - @Parameterized.Parameter public boolean isIcebergMode; - @Before public void setup() { objectMapper.setVisibility(PropertyAccessor.GETTER, JsonAutoDetect.Visibility.ANY); @@ -177,7 +166,6 @@ public void testConstructorParameters() throws Exception { SnowflakeStreamingIngestClientFactory.builder("client") .setProperties(prop) .setParameterOverrides(parameterMap) - .setIsIceberg(isIcebergMode) .setIsTestMode(true) .build(); @@ -205,7 +193,6 @@ public void testClientFactoryWithJmxMetrics() throws Exception { .setProperties(prop) .setParameterOverrides( Collections.singletonMap(ENABLE_SNOWPIPE_STREAMING_METRICS, true)) - .setIsIceberg(isIcebergMode) .build(); Assert.assertEquals("client", client.getName()); @@ -357,7 +344,6 @@ public void testGetChannelsStatusWithRequest() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, - isIcebergMode, true, requestBuilder, null); @@ -418,7 +404,6 @@ public void testDropChannel() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, - isIcebergMode, true, requestBuilder, null); @@ -464,7 +449,6 @@ public void testGetChannelsStatusWithRequestError() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, - isIcebergMode, true, requestBuilder, null); @@ -693,7 +677,6 @@ public void testGetRetryBlobs() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, - isIcebergMode, true, requestBuilder, null); @@ -735,7 +718,6 @@ public void testRegisterBlobErrorResponse() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, - isIcebergMode, true, requestBuilder, null); @@ -783,7 +765,6 @@ public void testRegisterBlobSnowflakeInternalErrorResponse() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, - isIcebergMode, true, requestBuilder, null); @@ -840,7 +821,6 @@ public void testRegisterBlobSuccessResponse() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, - isIcebergMode, true, requestBuilder, null); @@ -925,7 +905,6 @@ public void testRegisterBlobsRetries() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, - isIcebergMode, true, requestBuilder, null); @@ -955,7 +934,6 @@ public void testRegisterBlobChunkLimit() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, - isIcebergMode, true, requestBuilder, null)); @@ -1128,7 +1106,6 @@ public void testRegisterBlobsRetriesSucceeds() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, - isIcebergMode, true, requestBuilder, null); @@ -1203,7 +1180,6 @@ public void testRegisterBlobResponseWithInvalidChannel() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, - isIcebergMode, true, requestBuilder, null); @@ -1254,7 +1230,7 @@ public void testRegisterBlobResponseWithInvalidChannel() throws Exception { @Test public void testFlush() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); ChannelsStatusResponse response = new ChannelsStatusResponse(); response.setStatusCode(0L); response.setMessage("Success"); @@ -1276,7 +1252,7 @@ public void testFlush() throws Exception { @Test public void testClose() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); ChannelsStatusResponse response = new ChannelsStatusResponse(); response.setStatusCode(0L); response.setMessage("Success"); @@ -1310,7 +1286,7 @@ public void testClose() throws Exception { @Test public void testCloseWithError() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); CompletableFuture future = new CompletableFuture<>(); future.completeExceptionally(new Exception("Simulating Error")); @@ -1348,7 +1324,7 @@ public void testCloseWithError() throws Exception { @Test public void testVerifyChannelsAreFullyCommittedSuccess() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel1", @@ -1396,7 +1372,6 @@ public void testFlushServiceException() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, - isIcebergMode, true, requestBuilder, parameterMap); @@ -1433,7 +1408,6 @@ public void testGetLatestCommittedOffsetTokens() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, - isIcebergMode, true, requestBuilder, null); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestStageTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestStageTest.java index d12c6231c..1ba9f98df 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestStageTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestStageTest.java @@ -276,7 +276,7 @@ public void testRefreshSnowflakeMetadataRemote() throws Exception { Mockito.when(mockResponse.getEntity()).thenReturn(entity); Mockito.when(mockClient.execute(Mockito.any())).thenReturn(mockResponse); - ParameterProvider parameterProvider = new ParameterProvider(false); + ParameterProvider parameterProvider = new ParameterProvider(); StreamingIngestStage stage = new StreamingIngestStage(true, "role", mockClient, mockBuilder, "clientName", 1);