From 2ecd4c9cf5c648fd58ed9200fd8b10ed92fea9e0 Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Wed, 23 Oct 2024 16:05:25 -0700 Subject: [PATCH] done --- .../SnowflakeStreamingIngestClientFactory.java | 11 +---------- .../ingest/streaming/internal/ParquetFlusher.java | 9 +++++++-- .../streaming/internal/ParquetRowBuffer.java | 5 ++++- .../SnowflakeStreamingIngestClientInternal.java | 13 +++++-------- .../java/net/snowflake/ingest/utils/Constants.java | 3 +++ .../snowflake/ingest/utils/ParameterProvider.java | 6 +++++- .../ingest/streaming/internal/BlobBuilderTest.java | 1 + .../streaming/internal/ChannelCacheTest.java | 6 +++++- .../internal/InsertRowsBenchmarkTest.java | 13 +++++-------- .../ingest/streaming/internal/OAuthBasicTest.java | 2 +- .../streaming/internal/RegisterServiceTest.java | 6 +++++- .../ingest/streaming/internal/RowBufferTest.java | 6 +++++- .../SnowflakeStreamingIngestChannelTest.java | 9 ++++++--- .../SnowflakeStreamingIngestClientTest.java | 14 +++++++------- .../internal/datatypes/AbstractDataTypeTest.java | 3 ++- 15 files changed, 62 insertions(+), 45 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java b/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java index e77279ab9..1c87167a4 100644 --- a/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java +++ b/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java @@ -31,9 +31,6 @@ public static class Builder { // Indicates whether it's under test mode private boolean isTestMode; - // Whether we are going to ingest into iceberg tables - private boolean isIceberg; - private Builder(String name) { this.name = name; } @@ -53,12 +50,6 @@ public Builder setIsTestMode(boolean isTestMode) { return this; } - // do not make public until the feature is ready - Builder setIsIceberg(boolean isIceberg) { - this.isIceberg = isIceberg; - return this; - } - public SnowflakeStreamingIngestClient build() { Utils.assertStringNotNullOrEmpty("client name", this.name); Utils.assertNotNull("connection properties", this.prop); @@ -67,7 +58,7 @@ public SnowflakeStreamingIngestClient build() { SnowflakeURL accountURL = new SnowflakeURL(prop.getProperty(Constants.ACCOUNT_URL)); return new SnowflakeStreamingIngestClientInternal<>( - this.name, accountURL, prop, this.parameterOverrides, this.isIceberg, this.isTestMode); + this.name, accountURL, prop, this.parameterOverrides, this.isTestMode); } } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java index e7272d94a..6fc5954d6 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java @@ -33,6 +33,7 @@ public class ParquetFlusher implements Flusher { private final Constants.BdecParquetCompression bdecParquetCompression; private final ParquetProperties.WriterVersion parquetWriterVersion; private final boolean enableDictionaryEncoding; + private final boolean isIcebergMode; /** Construct parquet flusher from its schema. */ public ParquetFlusher( @@ -41,13 +42,15 @@ public ParquetFlusher( Optional maxRowGroups, Constants.BdecParquetCompression bdecParquetCompression, ParquetProperties.WriterVersion parquetWriterVersion, - boolean enableDictionaryEncoding) { + boolean enableDictionaryEncoding, + boolean isIcebergMode) { this.schema = schema; this.maxChunkSizeInBytes = maxChunkSizeInBytes; this.maxRowGroups = maxRowGroups; this.bdecParquetCompression = bdecParquetCompression; this.parquetWriterVersion = parquetWriterVersion; this.enableDictionaryEncoding = enableDictionaryEncoding; + this.isIcebergMode = isIcebergMode; } @Override @@ -127,7 +130,9 @@ private SerializationResult serializeFromJavaObjects( // We insert the filename in the file itself as metadata so that streams can work on replicated // mixed tables. For a more detailed discussion on the topic see SNOW-561447 and // http://go/streams-on-replicated-mixed-tables - metadata.put(Constants.PRIMARY_FILE_ID_KEY, StreamingIngestUtils.getShortname(filePath)); + metadata.put( + isIcebergMode ? Constants.FULL_FILL_NAME_KEY : Constants.PRIMARY_FILE_ID_KEY, + StreamingIngestUtils.getShortname(filePath)); parquetWriter = new SnowflakeParquetWriter( mergedData, diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java index 96a1156ce..3df7cb4a7 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java @@ -456,6 +456,9 @@ public Flusher createFlusher() { clientBufferParameters.getMaxRowGroups(), clientBufferParameters.getBdecParquetCompression(), parquetWriterVersion, - clientBufferParameters.isEnableDictionaryEncoding()); + parquetWriterVersion == ParquetProperties.WriterVersion.PARQUET_2_0 + && clientBufferParameters + .isEnableDictionaryEncoding() /* writer 1.0 does not support dictionary encoding*/, + clientBufferParameters.getIsIcebergMode()); } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index f553fb7ff..691fbfb6a 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -153,7 +153,6 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea * @param prop connection properties * @param httpClient http client for sending request * @param isTestMode whether we're under test mode - * @param isIcebergMode whether we're streaming to Iceberg tables * @param requestBuilder http request builder * @param parameterOverrides parameters we override in case we want to set different values */ @@ -162,16 +161,16 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea SnowflakeURL accountURL, Properties prop, CloseableHttpClient httpClient, - boolean isIcebergMode, boolean isTestMode, RequestBuilder requestBuilder, Map parameterOverrides) { + this.isIcebergMode = + prop != null && Boolean.parseBoolean(prop.getProperty(Constants.STREAMING_ICEBERG)); this.parameterProvider = new ParameterProvider(parameterOverrides, prop, isIcebergMode); this.internalParameterProvider = new InternalParameterProvider(isIcebergMode); this.name = name; String accountName = accountURL == null ? null : accountURL.getAccount(); - this.isIcebergMode = isIcebergMode; this.isTestMode = isTestMode; this.httpClient = httpClient == null ? HttpUtil.getHttpClient(accountName) : httpClient; this.channelCache = new ChannelCache<>(); @@ -267,7 +266,6 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea * @param accountURL Snowflake account url * @param prop connection properties * @param parameterOverrides map of parameters to override for this client - * @param isIcebergMode whether we're streaming to Iceberg tables * @param isTestMode indicates whether it's under test mode */ public SnowflakeStreamingIngestClientInternal( @@ -275,17 +273,16 @@ public SnowflakeStreamingIngestClientInternal( SnowflakeURL accountURL, Properties prop, Map parameterOverrides, - boolean isIcebergMode, boolean isTestMode) { - this(name, accountURL, prop, null, isIcebergMode, isTestMode, null, parameterOverrides); + this(name, accountURL, prop, null, isTestMode, null, parameterOverrides); } /*** Constructor for TEST ONLY * * @param name the name of the client */ - SnowflakeStreamingIngestClientInternal(String name, boolean isIcebergMode) { - this(name, null, null, null, isIcebergMode, true, null, new HashMap<>()); + SnowflakeStreamingIngestClientInternal(String name) { + this(name, null, null, null, true, null, new HashMap<>()); } // TESTING ONLY - inject the request builder diff --git a/src/main/java/net/snowflake/ingest/utils/Constants.java b/src/main/java/net/snowflake/ingest/utils/Constants.java index a5e04e21e..deead98c6 100644 --- a/src/main/java/net/snowflake/ingest/utils/Constants.java +++ b/src/main/java/net/snowflake/ingest/utils/Constants.java @@ -33,9 +33,12 @@ public class Constants { public static final String OAUTH_CLIENT_SECRET = "oauth_client_secret"; public static final String OAUTH_REFRESH_TOKEN = "oauth_refresh_token"; public static final String OAUTH_TOKEN_ENDPOINT = "oauth_token_endpoint"; + public static final String STREAMING_ICEBERG = "streaming_iceberg"; public static final String SNOWFLAKE_OAUTH_TOKEN_ENDPOINT = "/oauth/token-request"; public static final String PRIMARY_FILE_ID_KEY = "primaryFileId"; // Don't change, should match Parquet Scanner + public static final String FULL_FILL_NAME_KEY = + "fullFillNameKey"; // Don't change, should match Parquet Scanner public static final long RESPONSE_SUCCESS = 0L; // Don't change, should match server side public static final long RESPONSE_ERR_GENERAL_EXCEPTION_RETRY_REQUEST = 10L; // Don't change, should match server side diff --git a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java index b86e59525..d104dfccc 100644 --- a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java +++ b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java @@ -73,6 +73,8 @@ public class ParameterProvider { public static final Constants.BdecParquetCompression BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT = Constants.BdecParquetCompression.GZIP; + public static final Constants.BdecParquetCompression + PARQUET_COMPRESSION_ALGORITHM_ICEBERG_MODE_DEFAULT = Constants.BdecParquetCompression.ZSTD; /* Iceberg mode parameters: When streaming to Iceberg mode, different default parameters are required because it generates Parquet files instead of BDEC files. */ public static final int MAX_CHUNKS_IN_BLOB_ICEBERG_MODE_DEFAULT = 1; @@ -253,7 +255,9 @@ private void setParameterMap( this.checkAndUpdate( BDEC_PARQUET_COMPRESSION_ALGORITHM, - BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT, + isIcebergMode + ? BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT + : PARQUET_COMPRESSION_ALGORITHM_ICEBERG_MODE_DEFAULT, parameterOverrides, props, false /* enforceDefault */); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java index 9fc585483..2ef4885a9 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java @@ -136,6 +136,7 @@ private List> createChannelDataPerTable(int metada isIceberg ? ParquetProperties.WriterVersion.PARQUET_2_0 : ParquetProperties.WriterVersion.PARQUET_1_0, + isIceberg, isIceberg)) .when(channelData) .createFlusher(); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java index d9a207ee8..4ade88188 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java @@ -9,10 +9,12 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; import net.snowflake.client.jdbc.internal.apache.http.impl.client.CloseableHttpClient; import net.snowflake.ingest.connection.RequestBuilder; import net.snowflake.ingest.streaming.OpenChannelRequest; +import net.snowflake.ingest.utils.Constants; import org.apache.parquet.column.ParquetProperties; import org.junit.Assert; import org.junit.Before; @@ -45,9 +47,11 @@ public void setup() { cache = new ChannelCache<>(); CloseableHttpClient httpClient = MockSnowflakeServiceClient.createHttpClient(); RequestBuilder requestBuilder = MockSnowflakeServiceClient.createRequestBuilder(httpClient); + Properties prop = new Properties(); + prop.setProperty(Constants.STREAMING_ICEBERG, String.valueOf(isIcebergMode)); client = new SnowflakeStreamingIngestClientInternal<>( - "client", null, null, httpClient, isIcebergMode, true, requestBuilder, new HashMap<>()); + "client", null, prop, httpClient, true, requestBuilder, new HashMap<>()); channel1 = new SnowflakeStreamingIngestChannelInternal<>( diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/InsertRowsBenchmarkTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/InsertRowsBenchmarkTest.java index b2b81adad..85f670bfc 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/InsertRowsBenchmarkTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/InsertRowsBenchmarkTest.java @@ -9,11 +9,13 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Properties; import java.util.concurrent.TimeUnit; import net.snowflake.client.jdbc.internal.apache.http.impl.client.CloseableHttpClient; import net.snowflake.ingest.connection.RequestBuilder; import net.snowflake.ingest.streaming.InsertValidationResponse; import net.snowflake.ingest.streaming.OpenChannelRequest; +import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.Utils; import org.apache.parquet.column.ParquetProperties; import org.junit.Assert; @@ -55,16 +57,11 @@ public void setUpBeforeAll() { // SNOW-1490151: Testing gaps CloseableHttpClient httpClient = MockSnowflakeServiceClient.createHttpClient(); RequestBuilder requestBuilder = MockSnowflakeServiceClient.createRequestBuilder(httpClient); + Properties prop = new Properties(); + prop.setProperty(Constants.STREAMING_ICEBERG, String.valueOf(isIcebergMode)); client = new SnowflakeStreamingIngestClientInternal<>( - "client_PARQUET", - null, - null, - httpClient, - isIcebergMode, - true, - requestBuilder, - new HashMap<>()); + "client_PARQUET", null, prop, httpClient, true, requestBuilder, new HashMap<>()); channel = new SnowflakeStreamingIngestChannelInternal<>( diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/OAuthBasicTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/OAuthBasicTest.java index 688ec0c91..0f922b1d0 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/OAuthBasicTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/OAuthBasicTest.java @@ -119,7 +119,7 @@ public void testCreateOAuthClient() throws Exception { public void testSetRefreshToken() throws Exception { // SNOW-1490151: Testing gaps SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("TEST_CLIENT", false); + new SnowflakeStreamingIngestClientInternal<>("TEST_CLIENT"); MockOAuthClient mockOAuthClient = new MockOAuthClient(); OAuthManager oAuthManager = diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RegisterServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RegisterServiceTest.java index f4c152baa..f507c056a 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RegisterServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RegisterServiceTest.java @@ -11,12 +11,14 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Properties; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import net.snowflake.client.jdbc.internal.apache.http.impl.client.CloseableHttpClient; import net.snowflake.ingest.connection.RequestBuilder; +import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.Pair; import org.junit.After; import org.junit.Assert; @@ -41,9 +43,11 @@ public static Object[] isIcebergMode() { public void setup() { CloseableHttpClient httpClient = MockSnowflakeServiceClient.createHttpClient(); RequestBuilder requestBuilder = MockSnowflakeServiceClient.createRequestBuilder(httpClient); + Properties prop = new Properties(); + prop.setProperty(Constants.STREAMING_ICEBERG, String.valueOf(isIcebergMode)); client = new SnowflakeStreamingIngestClientInternal<>( - "client", null, null, httpClient, isIcebergMode, true, requestBuilder, new HashMap<>()); + "client", null, prop, httpClient, true, requestBuilder, new HashMap<>()); } @After diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java index 246753fbe..8edfe40cb 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java @@ -2026,7 +2026,11 @@ public void testParquetFileNameMetadata() throws IOException { flusher.serialize(Collections.singletonList(data), filePath); BdecParquetReader reader = new BdecParquetReader(result.chunkData.toByteArray()); - Assert.assertEquals(filePath, reader.getKeyValueMetadata().get(Constants.PRIMARY_FILE_ID_KEY)); + Assert.assertEquals( + filePath, + reader + .getKeyValueMetadata() + .get(isIcebergMode ? Constants.FULL_FILL_NAME_KEY : Constants.PRIMARY_FILE_ID_KEY)); } @Test diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java index 3c0bdfc2b..6fbced668 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java @@ -89,14 +89,18 @@ public static Object[] isIcebergMode() { private SnowflakeStreamingIngestClientInternal client; private MockSnowflakeServiceClient.ApiOverride apiOverride; + private Properties prop; + @Before public void setup() { apiOverride = new MockSnowflakeServiceClient.ApiOverride(); CloseableHttpClient httpClient = MockSnowflakeServiceClient.createHttpClient(apiOverride); RequestBuilder requestBuilder = MockSnowflakeServiceClient.createRequestBuilder(httpClient); + prop = new Properties(); + prop.setProperty(Constants.STREAMING_ICEBERG, String.valueOf(isIcebergMode)); client = new SnowflakeStreamingIngestClientInternal<>( - "client", null, null, httpClient, isIcebergMode, true, requestBuilder, new HashMap<>()); + "client", null, prop, httpClient, true, requestBuilder, new HashMap<>()); // some tests assume client is a mock object, just do it for everyone. client = Mockito.spy(client); @@ -501,9 +505,8 @@ public void testOpenChannelSuccessResponse() throws Exception { new SnowflakeStreamingIngestClientInternal<>( "client", new SnowflakeURL("snowflake.dev.local:8082"), - null, + prop, httpClient, - isIcebergMode, true, requestBuilder, null); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java index e3f563fb6..ccce30520 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java @@ -100,6 +100,7 @@ public static Object[] isIcebergMode() { SnowflakeStreamingIngestClientInternal client; private MockSnowflakeServiceClient.ApiOverride apiOverride; RequestBuilder requestBuilder; + private Properties isIcebergProp; @Before public void setup() throws Exception { @@ -110,13 +111,15 @@ public void setup() throws Exception { prop.put(ACCOUNT_URL, TestUtils.getHost()); prop.put(PRIVATE_KEY, TestUtils.getPrivateKey()); prop.put(ROLE, TestUtils.getRole()); + isIcebergProp = new Properties(); + isIcebergProp.setProperty(Constants.STREAMING_ICEBERG, String.valueOf(isIcebergMode)); apiOverride = new MockSnowflakeServiceClient.ApiOverride(); CloseableHttpClient httpClient = MockSnowflakeServiceClient.createHttpClient(apiOverride); requestBuilder = Mockito.spy(MockSnowflakeServiceClient.createRequestBuilder(httpClient)); client = new SnowflakeStreamingIngestClientInternal<>( - "client", null, null, httpClient, isIcebergMode, true, requestBuilder, new HashMap<>()); + "client", null, isIcebergProp, httpClient, true, requestBuilder, new HashMap<>()); channel1 = new SnowflakeStreamingIngestChannelInternal<>( @@ -472,9 +475,8 @@ public void testRegisterBlobRequestCreationSuccess() throws Exception { new SnowflakeStreamingIngestClientInternal<>( "client", new SnowflakeURL("snowflake.dev.local:8082"), - null, + isIcebergProp, httpClient, - isIcebergMode, true, requestBuilder, null); @@ -1001,9 +1003,8 @@ public void testRegisterBlobsRetriesSucceeds() throws Exception { new SnowflakeStreamingIngestClientInternal<>( "client", new SnowflakeURL("snowflake.dev.local:8082"), - null, + isIcebergProp, httpClient, - isIcebergMode, true, requestBuilder, null); @@ -1241,9 +1242,8 @@ public void testFlushServiceException() throws Exception { new SnowflakeStreamingIngestClientInternal<>( "client", new SnowflakeURL("snowflake.dev.local:8082"), - null, + isIcebergProp, httpClient, - isIcebergMode, true, requestBuilder, parameterMap); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java index baa370e16..53c31a8b7 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java @@ -122,6 +122,7 @@ protected void setUp( conn.createStatement().execute(String.format("use warehouse %s;", TestUtils.getWarehouse())); Properties props = TestUtils.getProperties(Constants.BdecVersion.THREE, false); + props.setProperty(Constants.STREAMING_ICEBERG, String.valueOf(isIceberg)); if (props.getProperty(ROLE).equals("DEFAULT_ROLE")) { props.setProperty(ROLE, "ACCOUNTADMIN"); } @@ -135,7 +136,7 @@ protected void setUp( SnowflakeURL accountURL = new SnowflakeURL(prop.getProperty(Constants.ACCOUNT_URL)); client = new SnowflakeStreamingIngestClientInternal<>( - "client1", accountURL, prop, parameterMap, isIceberg, false); + "client1", accountURL, prop, parameterMap, false); } @After