From dbe021a1c868dfb23915d3983f5842a04f63dd57 Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Thu, 24 Oct 2024 17:07:11 -0700 Subject: [PATCH] address comments --- .../internal/ClientBufferParameters.java | 4 +- .../streaming/internal/ParquetFlusher.java | 9 +-- .../streaming/internal/ParquetRowBuffer.java | 5 +- ...nowflakeStreamingIngestClientInternal.java | 27 +++------ .../net/snowflake/ingest/utils/Constants.java | 3 - .../ingest/utils/ParameterProvider.java | 56 +++++++++++-------- .../streaming/internal/BlobBuilderTest.java | 1 - .../streaming/internal/ChannelCacheTest.java | 4 +- .../streaming/internal/FlushServiceTest.java | 2 +- .../internal/InsertRowsBenchmarkTest.java | 4 +- .../internal/ParameterProviderTest.java | 5 +- .../internal/RegisterServiceTest.java | 4 +- .../streaming/internal/RowBufferTest.java | 6 +- .../SnowflakeStreamingIngestChannelTest.java | 2 +- .../SnowflakeStreamingIngestClientTest.java | 2 +- .../datatypes/AbstractDataTypeTest.java | 2 +- 16 files changed, 59 insertions(+), 77 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java b/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java index 36a6db282..d17c98ab2 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java @@ -75,8 +75,8 @@ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInter : ParameterProvider.ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT; this.isIcebergMode = clientInternal != null - ? clientInternal.isIcebergMode() - : ParameterProvider.IS_ICEBERG_MODE_DEFAULT; + ? clientInternal.getParameterProvider().isIcebergMode() + : ParameterProvider.STREAMING_ICEBERG_DEFAULT; this.maxRowGroups = isIcebergMode ? Optional.of(InternalParameterProvider.MAX_ROW_GROUP_COUNT_ICEBERG_MODE_DEFAULT) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java index 6fc5954d6..e7272d94a 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java @@ -33,7 +33,6 @@ public class ParquetFlusher implements Flusher { private final Constants.BdecParquetCompression bdecParquetCompression; private final ParquetProperties.WriterVersion parquetWriterVersion; private final boolean enableDictionaryEncoding; - private final boolean isIcebergMode; /** Construct parquet flusher from its schema. */ public ParquetFlusher( @@ -42,15 +41,13 @@ public ParquetFlusher( Optional maxRowGroups, Constants.BdecParquetCompression bdecParquetCompression, ParquetProperties.WriterVersion parquetWriterVersion, - boolean enableDictionaryEncoding, - boolean isIcebergMode) { + boolean enableDictionaryEncoding) { this.schema = schema; this.maxChunkSizeInBytes = maxChunkSizeInBytes; this.maxRowGroups = maxRowGroups; this.bdecParquetCompression = bdecParquetCompression; this.parquetWriterVersion = parquetWriterVersion; this.enableDictionaryEncoding = enableDictionaryEncoding; - this.isIcebergMode = isIcebergMode; } @Override @@ -130,9 +127,7 @@ private SerializationResult serializeFromJavaObjects( // We insert the filename in the file itself as metadata so that streams can work on replicated // mixed tables. For a more detailed discussion on the topic see SNOW-561447 and // http://go/streams-on-replicated-mixed-tables - metadata.put( - isIcebergMode ? Constants.FULL_FILL_NAME_KEY : Constants.PRIMARY_FILE_ID_KEY, - StreamingIngestUtils.getShortname(filePath)); + metadata.put(Constants.PRIMARY_FILE_ID_KEY, StreamingIngestUtils.getShortname(filePath)); parquetWriter = new SnowflakeParquetWriter( mergedData, diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java index b0bab8134..a053ff6ec 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java @@ -458,9 +458,6 @@ public Flusher createFlusher() { clientBufferParameters.getMaxRowGroups(), clientBufferParameters.getBdecParquetCompression(), parquetWriterVersion, - parquetWriterVersion == ParquetProperties.WriterVersion.PARQUET_2_0 - && clientBufferParameters - .isEnableDictionaryEncoding() /* writer 1.0 does not support dictionary encoding*/, - clientBufferParameters.getIsIcebergMode()); + clientBufferParameters.isEnableDictionaryEncoding()); } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index 37706a555..792228721 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -115,9 +115,6 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea // Indicates whether the client has closed private volatile boolean isClosed; - // Indicates wheter the client is streaming to Iceberg tables - private final boolean isIcebergMode; - // Indicates whether the client is under test mode private final boolean isTestMode; @@ -164,10 +161,9 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea boolean isTestMode, RequestBuilder requestBuilder, Map parameterOverrides) { - this.isIcebergMode = - prop != null && Boolean.parseBoolean(prop.getProperty(Constants.STREAMING_ICEBERG)); - this.parameterProvider = new ParameterProvider(parameterOverrides, prop, isIcebergMode); - this.internalParameterProvider = new InternalParameterProvider(isIcebergMode); + this.parameterProvider = new ParameterProvider(parameterOverrides, prop); + this.internalParameterProvider = + new InternalParameterProvider(parameterProvider.isIcebergMode()); this.name = name; String accountName = accountURL == null ? null : accountURL.getAccount(); @@ -236,7 +232,7 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea this.snowflakeServiceClient = new SnowflakeServiceClient(this.httpClient, this.requestBuilder); this.storageManager = - isIcebergMode + parameterProvider.isIcebergMode() ? new SubscopedTokenExternalVolumeManager( this.role, this.name, this.snowflakeServiceClient) : new InternalStageManager( @@ -346,14 +342,14 @@ public SnowflakeStreamingIngestChannelInternal openChannel(OpenChannelRequest request.getTableName(), request.getChannelName(), Constants.WriteMode.CLOUD_STORAGE, - this.isIcebergMode, + this.parameterProvider.isIcebergMode(), request.getOffsetToken()); response = snowflakeServiceClient.openChannel(openChannelRequest); } catch (IOException | IngestResponseException e) { throw new SFException(e, ErrorCode.OPEN_CHANNEL_FAILURE, e.getMessage()); } - if (isIcebergMode) { + if (parameterProvider.isIcebergMode()) { if (response.getTableColumns().stream().anyMatch(c -> c.getSourceIcebergDataType() == null)) { throw new SFException( ErrorCode.INTERNAL_ERROR, "Iceberg table columns must have sourceIcebergDataType set"); @@ -391,7 +387,7 @@ public SnowflakeStreamingIngestChannelInternal openChannel(OpenChannelRequest .setDefaultTimezone(request.getDefaultTimezone()) .setOffsetTokenVerificationFunction(request.getOffsetTokenVerificationFunction()) .setParquetWriterVersion( - isIcebergMode + parameterProvider.isIcebergMode() ? Constants.IcebergSerializationPolicy.valueOf( response.getIcebergSerializationPolicy()) .toParquetWriterVersion() @@ -431,7 +427,7 @@ public void dropChannel(DropChannelRequest request) { request.getSchemaName(), request.getTableName(), request.getChannelName(), - this.isIcebergMode, + this.parameterProvider.isIcebergMode(), request instanceof DropChannelVersionRequest ? ((DropChannelVersionRequest) request).getClientSequencer() : null); @@ -591,7 +587,7 @@ void registerBlobs(List blobs, final int executionCount) { this.storageManager.getClientPrefix() + "_" + counter.getAndIncrement(), this.role, blobs, - this.isIcebergMode); + this.parameterProvider.isIcebergMode()); response = snowflakeServiceClient.registerBlob(request, executionCount); } catch (IOException | IngestResponseException e) { throw new SFException(e, ErrorCode.REGISTER_BLOB_FAILURE, e.getMessage()); @@ -938,11 +934,6 @@ public void setRefreshToken(String refreshToken) { } } - /** Return whether the client is streaming to Iceberg tables */ - boolean isIcebergMode() { - return isIcebergMode; - } - /** * Registers the performance metrics along with JVM memory and Threads. * diff --git a/src/main/java/net/snowflake/ingest/utils/Constants.java b/src/main/java/net/snowflake/ingest/utils/Constants.java index 2f61bca23..754c81cff 100644 --- a/src/main/java/net/snowflake/ingest/utils/Constants.java +++ b/src/main/java/net/snowflake/ingest/utils/Constants.java @@ -33,12 +33,9 @@ public class Constants { public static final String OAUTH_CLIENT_SECRET = "oauth_client_secret"; public static final String OAUTH_REFRESH_TOKEN = "oauth_refresh_token"; public static final String OAUTH_TOKEN_ENDPOINT = "oauth_token_endpoint"; - public static final String STREAMING_ICEBERG = "streaming_iceberg"; public static final String SNOWFLAKE_OAUTH_TOKEN_ENDPOINT = "/oauth/token-request"; public static final String PRIMARY_FILE_ID_KEY = "primaryFileId"; // Don't change, should match Parquet Scanner - public static final String FULL_FILL_NAME_KEY = - "fullFillNameKey"; // Don't change, should match Parquet Scanner public static final long RESPONSE_SUCCESS = 0L; // Don't change, should match server side public static final long RESPONSE_ERR_GENERAL_EXCEPTION_RETRY_REQUEST = 10L; // Don't change, should match server side diff --git a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java index d104dfccc..734788df6 100644 --- a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java +++ b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java @@ -47,6 +47,8 @@ public class ParameterProvider { public static final String ENABLE_NEW_JSON_PARSING_LOGIC = "ENABLE_NEW_JSON_PARSING_LOGIC".toLowerCase(); + public static final String STREAMING_ICEBERG = "STREAMING_ICEBERG".toLowerCase(); + // Default values public static final long BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS_DEFAULT = 100; public static final long INSERT_THROTTLE_INTERVAL_IN_MILLIS_DEFAULT = 1000; @@ -73,22 +75,17 @@ public class ParameterProvider { public static final Constants.BdecParquetCompression BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT = Constants.BdecParquetCompression.GZIP; - public static final Constants.BdecParquetCompression - PARQUET_COMPRESSION_ALGORITHM_ICEBERG_MODE_DEFAULT = Constants.BdecParquetCompression.ZSTD; /* Iceberg mode parameters: When streaming to Iceberg mode, different default parameters are required because it generates Parquet files instead of BDEC files. */ public static final int MAX_CHUNKS_IN_BLOB_ICEBERG_MODE_DEFAULT = 1; public static final boolean ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT = true; - public static final boolean IS_ICEBERG_MODE_DEFAULT = false; + public static final boolean STREAMING_ICEBERG_DEFAULT = false; /** Map of parameter name to parameter value. This will be set by client/configure API Call. */ private final Map parameterMap = new HashMap<>(); - /* Iceberg mode flag */ - private final boolean isIcebergMode; - // Cached buffer flush interval - avoid parsing each time for quick lookup private Long cachedBufferFlushIntervalMs = -1L; @@ -98,18 +95,23 @@ public class ParameterProvider { * * @param parameterOverrides Map of parameter name to value * @param props Properties from profile file - * @param isIcebergMode If the provided parameters need to be verified and modified to meet - * Iceberg mode */ + public ParameterProvider(Map parameterOverrides, Properties props) { + this.setParameterMap(parameterOverrides, props); + } + + /** Constructor for tests */ public ParameterProvider( Map parameterOverrides, Properties props, boolean isIcebergMode) { - this.isIcebergMode = isIcebergMode; - this.setParameterMap(parameterOverrides, props, isIcebergMode); + if (parameterOverrides != null) { + parameterOverrides.put(STREAMING_ICEBERG, isIcebergMode); + } + this.setParameterMap(parameterOverrides, props); } /** Empty constructor for tests */ public ParameterProvider(boolean isIcebergMode) { - this(null, null, isIcebergMode); + this(new HashMap<>(), null, isIcebergMode); } private void checkAndUpdate( @@ -139,11 +141,8 @@ private void checkAndUpdate( * * @param parameterOverrides Map of parameter name -> value * @param props Properties file provided to client constructor - * @param isIcebergMode If the provided parameters need to be verified and modified to meet - * Iceberg mode */ - private void setParameterMap( - Map parameterOverrides, Properties props, boolean isIcebergMode) { + private void setParameterMap(Map parameterOverrides, Properties props) { // BUFFER_FLUSH_INTERVAL_IN_MILLIS is deprecated and disallowed if ((parameterOverrides != null && parameterOverrides.containsKey(BUFFER_FLUSH_INTERVAL_IN_MILLIS)) @@ -154,6 +153,13 @@ private void setParameterMap( BUFFER_FLUSH_INTERVAL_IN_MILLIS, MAX_CLIENT_LAG)); } + this.checkAndUpdate( + STREAMING_ICEBERG, + STREAMING_ICEBERG_DEFAULT, + parameterOverrides, + props, + false /* enforceDefault */); + this.checkAndUpdate( BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS, BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS_DEFAULT, @@ -234,17 +240,17 @@ private void setParameterMap( this.checkAndUpdate( MAX_CLIENT_LAG, - isIcebergMode ? MAX_CLIENT_LAG_ICEBERG_MODE_DEFAULT : MAX_CLIENT_LAG_DEFAULT, + isIcebergMode() ? MAX_CLIENT_LAG_ICEBERG_MODE_DEFAULT : MAX_CLIENT_LAG_DEFAULT, parameterOverrides, props, false /* enforceDefault */); this.checkAndUpdate( MAX_CHUNKS_IN_BLOB, - isIcebergMode ? MAX_CHUNKS_IN_BLOB_ICEBERG_MODE_DEFAULT : MAX_CHUNKS_IN_BLOB_DEFAULT, + isIcebergMode() ? MAX_CHUNKS_IN_BLOB_ICEBERG_MODE_DEFAULT : MAX_CHUNKS_IN_BLOB_DEFAULT, parameterOverrides, props, - isIcebergMode); + isIcebergMode()); this.checkAndUpdate( MAX_CHUNKS_IN_REGISTRATION_REQUEST, @@ -255,9 +261,7 @@ private void setParameterMap( this.checkAndUpdate( BDEC_PARQUET_COMPRESSION_ALGORITHM, - isIcebergMode - ? BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT - : PARQUET_COMPRESSION_ALGORITHM_ICEBERG_MODE_DEFAULT, + BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT, parameterOverrides, props, false /* enforceDefault */); @@ -292,7 +296,7 @@ private long getMaxClientLagInMs() { Object val = this.parameterMap.getOrDefault( MAX_CLIENT_LAG, - isIcebergMode ? MAX_CLIENT_LAG_ICEBERG_MODE_DEFAULT : MAX_CLIENT_LAG_DEFAULT); + isIcebergMode() ? MAX_CLIENT_LAG_ICEBERG_MODE_DEFAULT : MAX_CLIENT_LAG_DEFAULT); long computedLag; if (val instanceof String) { String maxLag = (String) val; @@ -472,7 +476,7 @@ public int getMaxChunksInBlob() { Object val = this.parameterMap.getOrDefault( MAX_CHUNKS_IN_BLOB, - isIcebergMode ? MAX_CHUNKS_IN_BLOB_ICEBERG_MODE_DEFAULT : MAX_CHUNKS_IN_BLOB_DEFAULT); + isIcebergMode() ? MAX_CHUNKS_IN_BLOB_ICEBERG_MODE_DEFAULT : MAX_CHUNKS_IN_BLOB_DEFAULT); return (val instanceof String) ? Integer.parseInt(val.toString()) : (int) val; } @@ -503,6 +507,12 @@ public boolean isEnableNewJsonParsingLogic() { return (val instanceof String) ? Boolean.parseBoolean(val.toString()) : (boolean) val; } + /** @return Whether the client is in Iceberg mode */ + public boolean isIcebergMode() { + Object val = this.parameterMap.getOrDefault(STREAMING_ICEBERG, STREAMING_ICEBERG_DEFAULT); + return (val instanceof String) ? Boolean.parseBoolean(val.toString()) : (boolean) val; + } + @Override public String toString() { return "ParameterProvider{" + "parameterMap=" + parameterMap + '}'; diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java index 2ef4885a9..9fc585483 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java @@ -136,7 +136,6 @@ private List> createChannelDataPerTable(int metada isIceberg ? ParquetProperties.WriterVersion.PARQUET_2_0 : ParquetProperties.WriterVersion.PARQUET_1_0, - isIceberg, isIceberg)) .when(channelData) .createFlusher(); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java index 4ade88188..0a948dbc3 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java @@ -14,7 +14,7 @@ import net.snowflake.client.jdbc.internal.apache.http.impl.client.CloseableHttpClient; import net.snowflake.ingest.connection.RequestBuilder; import net.snowflake.ingest.streaming.OpenChannelRequest; -import net.snowflake.ingest.utils.Constants; +import net.snowflake.ingest.utils.ParameterProvider; import org.apache.parquet.column.ParquetProperties; import org.junit.Assert; import org.junit.Before; @@ -48,7 +48,7 @@ public void setup() { CloseableHttpClient httpClient = MockSnowflakeServiceClient.createHttpClient(); RequestBuilder requestBuilder = MockSnowflakeServiceClient.createRequestBuilder(httpClient); Properties prop = new Properties(); - prop.setProperty(Constants.STREAMING_ICEBERG, String.valueOf(isIcebergMode)); + prop.setProperty(ParameterProvider.STREAMING_ICEBERG, String.valueOf(isIcebergMode)); client = new SnowflakeStreamingIngestClientInternal<>( "client", null, prop, httpClient, true, requestBuilder, new HashMap<>()); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index 86350e03d..ad8cda3fe 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -130,7 +130,7 @@ private abstract static class TestContext implements AutoCloseable { } void setParameterOverride(Map parameterOverride) { - this.parameterProvider = new ParameterProvider(parameterOverride, null, isIcebergMode); + this.parameterProvider = new ParameterProvider(parameterOverride, null); } ChannelData flushChannel(String name) { diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/InsertRowsBenchmarkTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/InsertRowsBenchmarkTest.java index 85f670bfc..3e0ad8a6a 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/InsertRowsBenchmarkTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/InsertRowsBenchmarkTest.java @@ -15,7 +15,7 @@ import net.snowflake.ingest.connection.RequestBuilder; import net.snowflake.ingest.streaming.InsertValidationResponse; import net.snowflake.ingest.streaming.OpenChannelRequest; -import net.snowflake.ingest.utils.Constants; +import net.snowflake.ingest.utils.ParameterProvider; import net.snowflake.ingest.utils.Utils; import org.apache.parquet.column.ParquetProperties; import org.junit.Assert; @@ -58,7 +58,7 @@ public void setUpBeforeAll() { CloseableHttpClient httpClient = MockSnowflakeServiceClient.createHttpClient(); RequestBuilder requestBuilder = MockSnowflakeServiceClient.createRequestBuilder(httpClient); Properties prop = new Properties(); - prop.setProperty(Constants.STREAMING_ICEBERG, String.valueOf(isIcebergMode)); + prop.setProperty(ParameterProvider.STREAMING_ICEBERG, String.valueOf(isIcebergMode)); client = new SnowflakeStreamingIngestClientInternal<>( "client_PARQUET", null, prop, httpClient, true, requestBuilder, new HashMap<>()); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java index 2bc4e1cdc..d0cbbb940 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java @@ -104,10 +104,7 @@ public void withNullInputs() { ParameterProvider parameterProvider = new ParameterProvider(null, null, isIcebergMode); Assert.assertEquals( - isIcebergMode - ? ParameterProvider.MAX_CLIENT_LAG_ICEBERG_MODE_DEFAULT - : ParameterProvider.MAX_CLIENT_LAG_DEFAULT, - parameterProvider.getCachedMaxClientLagInMs()); + ParameterProvider.MAX_CLIENT_LAG_DEFAULT, parameterProvider.getCachedMaxClientLagInMs()); Assert.assertEquals( ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS_DEFAULT, parameterProvider.getBufferFlushCheckIntervalInMs()); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RegisterServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RegisterServiceTest.java index f507c056a..fbccaaa24 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RegisterServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RegisterServiceTest.java @@ -18,8 +18,8 @@ import java.util.concurrent.TimeoutException; import net.snowflake.client.jdbc.internal.apache.http.impl.client.CloseableHttpClient; import net.snowflake.ingest.connection.RequestBuilder; -import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.Pair; +import net.snowflake.ingest.utils.ParameterProvider; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -44,7 +44,7 @@ public void setup() { CloseableHttpClient httpClient = MockSnowflakeServiceClient.createHttpClient(); RequestBuilder requestBuilder = MockSnowflakeServiceClient.createRequestBuilder(httpClient); Properties prop = new Properties(); - prop.setProperty(Constants.STREAMING_ICEBERG, String.valueOf(isIcebergMode)); + prop.setProperty(ParameterProvider.STREAMING_ICEBERG, String.valueOf(isIcebergMode)); client = new SnowflakeStreamingIngestClientInternal<>( "client", null, prop, httpClient, true, requestBuilder, new HashMap<>()); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java index 2c824cb17..042834ce7 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java @@ -2026,11 +2026,7 @@ public void testParquetFileNameMetadata() throws IOException { flusher.serialize(Collections.singletonList(data), filePath); BdecParquetReader reader = new BdecParquetReader(result.chunkData.toByteArray()); - Assert.assertEquals( - filePath, - reader - .getKeyValueMetadata() - .get(isIcebergMode ? Constants.FULL_FILL_NAME_KEY : Constants.PRIMARY_FILE_ID_KEY)); + Assert.assertEquals(filePath, reader.getKeyValueMetadata().get(Constants.PRIMARY_FILE_ID_KEY)); } @Test diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java index 6fbced668..d5233abac 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java @@ -97,7 +97,7 @@ public void setup() { CloseableHttpClient httpClient = MockSnowflakeServiceClient.createHttpClient(apiOverride); RequestBuilder requestBuilder = MockSnowflakeServiceClient.createRequestBuilder(httpClient); prop = new Properties(); - prop.setProperty(Constants.STREAMING_ICEBERG, String.valueOf(isIcebergMode)); + prop.setProperty(ParameterProvider.STREAMING_ICEBERG, String.valueOf(isIcebergMode)); client = new SnowflakeStreamingIngestClientInternal<>( "client", null, prop, httpClient, true, requestBuilder, new HashMap<>()); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java index ccce30520..da93d0167 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java @@ -112,7 +112,7 @@ public void setup() throws Exception { prop.put(PRIVATE_KEY, TestUtils.getPrivateKey()); prop.put(ROLE, TestUtils.getRole()); isIcebergProp = new Properties(); - isIcebergProp.setProperty(Constants.STREAMING_ICEBERG, String.valueOf(isIcebergMode)); + isIcebergProp.setProperty(ParameterProvider.STREAMING_ICEBERG, String.valueOf(isIcebergMode)); apiOverride = new MockSnowflakeServiceClient.ApiOverride(); CloseableHttpClient httpClient = MockSnowflakeServiceClient.createHttpClient(apiOverride); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java index 41fdb8321..108eac12f 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java @@ -123,7 +123,7 @@ protected void setUp( conn.createStatement().execute(String.format("use warehouse %s;", TestUtils.getWarehouse())); Properties props = TestUtils.getProperties(Constants.BdecVersion.THREE, false); - props.setProperty(Constants.STREAMING_ICEBERG, String.valueOf(isIceberg)); + props.setProperty(ParameterProvider.STREAMING_ICEBERG, String.valueOf(isIceberg)); if (props.getProperty(ROLE).equals("DEFAULT_ROLE")) { props.setProperty(ROLE, "ACCOUNTADMIN"); }