From a37433d3c53bc442d28003b275ca50c755ca9ea2 Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Fri, 26 Jul 2024 14:56:21 -0700 Subject: [PATCH 1/7] parameter set & disable blob encryption --- ...SnowflakeStreamingIngestClientFactory.java | 18 ++- .../streaming/internal/BlobBuilder.java | 66 +++++---- .../streaming/internal/FlushService.java | 7 +- .../internal/InternalParameterProvider.java | 20 +++ ...nowflakeStreamingIngestClientInternal.java | 29 +++- .../ingest/utils/ParameterProvider.java | 133 +++++++++++++----- .../streaming/internal/BlobBuilderTest.java | 16 ++- .../streaming/internal/ChannelCacheTest.java | 17 ++- .../streaming/internal/FlushServiceTest.java | 27 +++- .../internal/InsertRowsBenchmarkTest.java | 6 +- .../streaming/internal/OAuthBasicTest.java | 6 +- .../internal/ParameterProviderTest.java | 102 +++++++++++--- .../internal/RegisterServiceTest.java | 19 ++- .../SnowflakeStreamingIngestChannelTest.java | 46 ++++-- .../SnowflakeStreamingIngestClientTest.java | 38 ++++- 15 files changed, 427 insertions(+), 123 deletions(-) create mode 100644 src/main/java/net/snowflake/ingest/streaming/internal/InternalParameterProvider.java diff --git a/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java b/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java index cd6d78787..6b0f057a7 100644 --- a/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java +++ b/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.streaming; @@ -31,6 +31,10 @@ public static class Builder { // Indicates whether it's under test mode private boolean isTestMode; + // Indicates whether it's streaming to Iceberg tables. Open channels on regular tables should + // fail in this mode. + private boolean isIcebergMode; + private Builder(String name) { this.name = name; } @@ -50,6 +54,11 @@ public Builder setIsTestMode(boolean isTestMode) { return this; } + Builder setIsIcebergMode(boolean isIcebergMode) { + this.isIcebergMode = isIcebergMode; + return this; + } + public SnowflakeStreamingIngestClient build() { Utils.assertStringNotNullOrEmpty("client name", this.name); Utils.assertNotNull("connection properties", this.prop); @@ -58,7 +67,12 @@ public SnowflakeStreamingIngestClient build() { SnowflakeURL accountURL = new SnowflakeURL(prop.getProperty(Constants.ACCOUNT_URL)); return new SnowflakeStreamingIngestClientInternal<>( - this.name, accountURL, prop, this.parameterOverrides, this.isTestMode); + this.name, + accountURL, + prop, + this.parameterOverrides, + this.isIcebergMode, + this.isTestMode); } } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java b/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java index b88090e01..fcbed8176 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.streaming.internal; @@ -61,10 +61,14 @@ class BlobBuilder { * @param blobData All the data for one blob. Assumes that all ChannelData in the inner List * belongs to the same table. Will error if this is not the case * @param bdecVersion version of blob + * @param encrypt If the output chunk is encrypted or not * @return {@link Blob} data */ static Blob constructBlobAndMetadata( - String filePath, List>> blobData, Constants.BdecVersion bdecVersion) + String filePath, + List>> blobData, + Constants.BdecVersion bdecVersion, + boolean encrypt) throws IOException, NoSuchPaddingException, NoSuchAlgorithmException, InvalidAlgorithmParameterException, InvalidKeyException, IllegalBlockSizeException, BadPaddingException { @@ -83,25 +87,32 @@ static Blob constructBlobAndMetadata( flusher.serialize(channelsDataPerTable, filePath); if (!serializedChunk.channelsMetadataList.isEmpty()) { - ByteArrayOutputStream chunkData = serializedChunk.chunkData; - Pair paddedChunk = - padChunk(chunkData, Constants.ENCRYPTION_ALGORITHM_BLOCK_SIZE_BYTES); - byte[] paddedChunkData = paddedChunk.getFirst(); - int paddedChunkLength = paddedChunk.getSecond(); + byte[] compressedChunkData; + int chunkLength; - // Encrypt the compressed chunk data, the encryption key is derived using the key from - // server with the full blob path. - // We need to maintain IV as a block counter for the whole file, even interleaved, - // to align with decryption on the Snowflake query path. - // TODO: address alignment for the header SNOW-557866 - long iv = curDataSize / Constants.ENCRYPTION_ALGORITHM_BLOCK_SIZE_BYTES; - byte[] encryptedCompressedChunkData = - Cryptor.encrypt( - paddedChunkData, firstChannelFlushContext.getEncryptionKey(), filePath, iv); + if (encrypt) { + Pair paddedChunk = + padChunk(serializedChunk.chunkData, Constants.ENCRYPTION_ALGORITHM_BLOCK_SIZE_BYTES); + byte[] paddedChunkData = paddedChunk.getFirst(); + chunkLength = paddedChunk.getSecond(); + + // Encrypt the compressed chunk data, the encryption key is derived using the key from + // server with the full blob path. + // We need to maintain IV as a block counter for the whole file, even interleaved, + // to align with decryption on the Snowflake query path. + // TODO: address alignment for the header SNOW-557866 + long iv = curDataSize / Constants.ENCRYPTION_ALGORITHM_BLOCK_SIZE_BYTES; + compressedChunkData = + Cryptor.encrypt( + paddedChunkData, firstChannelFlushContext.getEncryptionKey(), filePath, iv); + } else { + compressedChunkData = serializedChunk.chunkData.toByteArray(); + chunkLength = compressedChunkData.length; + } // Compute the md5 of the chunk data - String md5 = computeMD5(encryptedCompressedChunkData, paddedChunkLength); - int encryptedCompressedChunkDataSize = encryptedCompressedChunkData.length; + String md5 = computeMD5(compressedChunkData, chunkLength); + int compressedChunkDataSize = compressedChunkData.length; // Create chunk metadata long startOffset = curDataSize; @@ -111,9 +122,9 @@ static Blob constructBlobAndMetadata( // The start offset will be updated later in BlobBuilder#build to include the blob // header .setChunkStartOffset(startOffset) - // The paddedChunkLength is used because it is the actual data size used for + // The chunkLength is used because it is the actual data size used for // decompression and md5 calculation on server side. - .setChunkLength(paddedChunkLength) + .setChunkLength(chunkLength) .setUncompressedChunkLength((int) serializedChunk.chunkEstimatedUncompressedSize) .setChannelList(serializedChunk.channelsMetadataList) .setChunkMD5(md5) @@ -127,21 +138,22 @@ static Blob constructBlobAndMetadata( // Add chunk metadata and data to the list chunksMetadataList.add(chunkMetadata); - chunksDataList.add(encryptedCompressedChunkData); - curDataSize += encryptedCompressedChunkDataSize; - crc.update(encryptedCompressedChunkData, 0, encryptedCompressedChunkDataSize); + chunksDataList.add(compressedChunkData); + curDataSize += compressedChunkDataSize; + crc.update(compressedChunkData, 0, compressedChunkDataSize); logger.logInfo( "Finish building chunk in blob={}, table={}, rowCount={}, startOffset={}," - + " estimatedUncompressedSize={}, paddedChunkLength={}, encryptedCompressedSize={}," - + " bdecVersion={}", + + " estimatedUncompressedSize={}, chunkLength={}, compressedSize={}," + + " encryption={}, bdecVersion={}", filePath, firstChannelFlushContext.getFullyQualifiedTableName(), serializedChunk.rowCount, startOffset, serializedChunk.chunkEstimatedUncompressedSize, - paddedChunkLength, - encryptedCompressedChunkDataSize, + chunkLength, + compressedChunkDataSize, + encrypt, bdecVersion); } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java index 954abfc4a..7c343209b 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java @@ -522,7 +522,12 @@ BlobMetadata buildAndUpload( Timer.Context buildContext = Utils.createTimerContext(this.owningClient.buildLatency); // Construct the blob along with the metadata of the blob - BlobBuilder.Blob blob = BlobBuilder.constructBlobAndMetadata(blobPath, blobData, bdecVersion); + BlobBuilder.Blob blob = + BlobBuilder.constructBlobAndMetadata( + blobPath, + blobData, + bdecVersion, + this.owningClient.getInternalParameterProvider().getEnableChunkEncryption()); blob.blobStats.setBuildDurationMs(buildContext); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/InternalParameterProvider.java b/src/main/java/net/snowflake/ingest/streaming/internal/InternalParameterProvider.java new file mode 100644 index 000000000..46a99b671 --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/InternalParameterProvider.java @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal; + +/** A class to provide non-configurable constants depends on Iceberg or non-Iceberg mode */ +class InternalParameterProvider { + private final boolean isIcebergMode; + + InternalParameterProvider(boolean isIcebergMode) { + this.isIcebergMode = isIcebergMode; + } + + boolean getEnableChunkEncryption() { + // When in Iceberg mode, chunk encryption is disabled. Otherwise, it is enabled. Since Iceberg + // mode does not need client-side encryption. + return !isIcebergMode; + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index 6331a4045..a53c5fb56 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -96,6 +96,9 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea // Snowflake role for the client to use private String role; + // Provides constant values which is determined by the Iceberg or non-Iceberg mode + private final InternalParameterProvider internalParameterProvider; + // Http client to send HTTP requests to Snowflake private final CloseableHttpClient httpClient; @@ -111,6 +114,9 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea // Indicates whether the client has closed private volatile boolean isClosed; + // Indicates wheter the client is streaming to Iceberg tables + private final boolean isIcebergMode; + // Indicates whether the client is under test mode private final boolean isTestMode; @@ -146,6 +152,7 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea * @param prop connection properties * @param httpClient http client for sending request * @param isTestMode whether we're under test mode + * @param isIcebergMode whether we're streaming to Iceberg tables * @param requestBuilder http request builder * @param parameterOverrides parameters we override in case we want to set different values */ @@ -154,13 +161,16 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea SnowflakeURL accountURL, Properties prop, CloseableHttpClient httpClient, + boolean isIcebergMode, boolean isTestMode, RequestBuilder requestBuilder, Map parameterOverrides) { - this.parameterProvider = new ParameterProvider(parameterOverrides, prop); + this.parameterProvider = new ParameterProvider(parameterOverrides, prop, isIcebergMode); + this.internalParameterProvider = new InternalParameterProvider(isIcebergMode); this.name = name; String accountName = accountURL == null ? null : accountURL.getAccount(); + this.isIcebergMode = isIcebergMode; this.isTestMode = isTestMode; this.httpClient = httpClient == null ? HttpUtil.getHttpClient(accountName) : httpClient; this.channelCache = new ChannelCache<>(); @@ -250,6 +260,7 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea * @param accountURL Snowflake account url * @param prop connection properties * @param parameterOverrides map of parameters to override for this client + * @param isIcebergMode whether we're streaming to Iceberg tables * @param isTestMode indicates whether it's under test mode */ public SnowflakeStreamingIngestClientInternal( @@ -257,16 +268,17 @@ public SnowflakeStreamingIngestClientInternal( SnowflakeURL accountURL, Properties prop, Map parameterOverrides, + boolean isIcebergMode, boolean isTestMode) { - this(name, accountURL, prop, null, isTestMode, null, parameterOverrides); + this(name, accountURL, prop, null, isIcebergMode, isTestMode, null, parameterOverrides); } /*** Constructor for TEST ONLY * * @param name the name of the client */ - SnowflakeStreamingIngestClientInternal(String name) { - this(name, null, null, null, true, null, new HashMap<>()); + SnowflakeStreamingIngestClientInternal(String name, boolean isIcebergMode) { + this(name, null, null, null, isIcebergMode, true, null, new HashMap<>()); } // TESTING ONLY - inject the request builder @@ -875,6 +887,15 @@ ParameterProvider getParameterProvider() { return parameterProvider; } + /** + * Get InternalParameterProvider with internal parameters + * + * @return {@link InternalParameterProvider} used by the client + */ + InternalParameterProvider getInternalParameterProvider() { + return internalParameterProvider; + } + /** * Set refresh token, this method is for refresh token renewal without requiring to restart * client. This method only works when the authorization type is OAuth diff --git a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java index 9d9db5356..7d8cb230f 100644 --- a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java +++ b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java @@ -1,5 +1,11 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.utils; +import static net.snowflake.ingest.utils.ErrorCode.INVALID_CONFIG_PARAMETER; + import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -50,11 +56,12 @@ public class ParameterProvider { public static final int IO_TIME_CPU_RATIO_DEFAULT = 2; public static final int BLOB_UPLOAD_MAX_RETRY_COUNT_DEFAULT = 24; public static final long MAX_MEMORY_LIMIT_IN_BYTES_DEFAULT = -1L; - public static final long MAX_CHANNEL_SIZE_IN_BYTES_DEFAULT = 64 * 1024 * 1024; - public static final long MAX_CHUNK_SIZE_IN_BYTES_DEFAULT = 256 * 1024 * 1024; + public static final long MAX_CHANNEL_SIZE_IN_BYTES_DEFAULT = 128 * 1024 * 1024; + public static final long MAX_CHUNK_SIZE_IN_BYTES_DEFAULT = 512 * 1024 * 1024; // Lag related parameters public static final long MAX_CLIENT_LAG_DEFAULT = 1000; // 1 second + public static final long MAX_CLIENT_LAG_ICEBERG_MODE_DEFAULT = 30000; // 30 seconds static final long MAX_CLIENT_LAG_MS_MIN = TimeUnit.SECONDS.toMillis(1); static final long MAX_CLIENT_LAG_MS_MAX = TimeUnit.MINUTES.toMillis(10); @@ -64,6 +71,9 @@ public class ParameterProvider { public static final Constants.BdecParquetCompression BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT = Constants.BdecParquetCompression.GZIP; + /* Iceberg mode parameters: When streaming to Iceberg mode, different default parameters are required because it generates Parquet files instead of BDEC files. */ + public static final int MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT = 1; + /* Parameter that enables using internal Parquet buffers for buffering of rows before serializing. It reduces memory consumption compared to using Java Objects for buffering.*/ public static final boolean ENABLE_PARQUET_INTERNAL_BUFFERING_DEFAULT = false; @@ -80,18 +90,25 @@ public class ParameterProvider { * * @param parameterOverrides Map of parameter name to value * @param props Properties from profile file + * @param isIcebergMode If the provided parameters need to be verified and modified to meet + * Iceberg mode */ - public ParameterProvider(Map parameterOverrides, Properties props) { - this.setParameterMap(parameterOverrides, props); + public ParameterProvider( + Map parameterOverrides, Properties props, boolean isIcebergMode) { + this.setParameterMap(parameterOverrides, props, isIcebergMode); } /** Empty constructor for tests */ - public ParameterProvider() { - this(null, null); + public ParameterProvider(boolean isIcebergMode) { + this(null, null, isIcebergMode); } - private void updateValue( - String key, Object defaultValue, Map parameterOverrides, Properties props) { + private void checkAndUpdate( + String key, + Object defaultValue, + Map parameterOverrides, + Properties props, + boolean enforceDefault) { if (parameterOverrides != null && props != null) { this.parameterMap.put( key, parameterOverrides.getOrDefault(key, props.getOrDefault(key, defaultValue))); @@ -99,6 +116,19 @@ private void updateValue( this.parameterMap.put(key, parameterOverrides.getOrDefault(key, defaultValue)); } else if (props != null) { this.parameterMap.put(key, props.getOrDefault(key, defaultValue)); + } else { + this.parameterMap.put(key, defaultValue); + } + + if (enforceDefault) { + if (!this.parameterMap.getOrDefault(key, defaultValue).equals(defaultValue)) { + throw new SFException( + INVALID_CONFIG_PARAMETER, + String.format( + "The value %s for %s is not configurable, should be %s.", + this.parameterMap.get(key), key, defaultValue)); + } + this.parameterMap.put(key, defaultValue); } } @@ -107,8 +137,11 @@ private void updateValue( * * @param parameterOverrides Map of parameter name -> value * @param props Properties file provided to client constructor + * @param isIcebergMode If the provided parameters need to be verified and modified to meet + * Iceberg mode */ - private void setParameterMap(Map parameterOverrides, Properties props) { + private void setParameterMap( + Map parameterOverrides, Properties props, boolean isIcebergMode) { // BUFFER_FLUSH_INTERVAL_IN_MILLIS is deprecated and disallowed if ((parameterOverrides != null && parameterOverrides.containsKey(BUFFER_FLUSH_INTERVAL_IN_MILLIS)) @@ -119,75 +152,101 @@ private void setParameterMap(Map parameterOverrides, Properties BUFFER_FLUSH_INTERVAL_IN_MILLIS, MAX_CLIENT_LAG)); } - this.updateValue( + this.checkAndUpdate( BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS, BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS_DEFAULT, parameterOverrides, - props); + props, + false); - this.updateValue( + this.checkAndUpdate( INSERT_THROTTLE_INTERVAL_IN_MILLIS, INSERT_THROTTLE_INTERVAL_IN_MILLIS_DEFAULT, parameterOverrides, - props); + props, + false); - this.updateValue( + this.checkAndUpdate( INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE, INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE_DEFAULT, parameterOverrides, - props); + props, + false); - this.updateValue( + this.checkAndUpdate( INSERT_THROTTLE_THRESHOLD_IN_BYTES, INSERT_THROTTLE_THRESHOLD_IN_BYTES_DEFAULT, parameterOverrides, - props); + props, + false); - this.updateValue( + this.checkAndUpdate( ENABLE_SNOWPIPE_STREAMING_METRICS, SNOWPIPE_STREAMING_METRICS_DEFAULT, parameterOverrides, - props); + props, + false); - this.updateValue(BLOB_FORMAT_VERSION, BLOB_FORMAT_VERSION_DEFAULT, parameterOverrides, props); + this.checkAndUpdate( + BLOB_FORMAT_VERSION, BLOB_FORMAT_VERSION_DEFAULT, parameterOverrides, props, false); getBlobFormatVersion(); // to verify parsing the configured value - this.updateValue(IO_TIME_CPU_RATIO, IO_TIME_CPU_RATIO_DEFAULT, parameterOverrides, props); + this.checkAndUpdate( + IO_TIME_CPU_RATIO, IO_TIME_CPU_RATIO_DEFAULT, parameterOverrides, props, false); - this.updateValue( + this.checkAndUpdate( BLOB_UPLOAD_MAX_RETRY_COUNT, BLOB_UPLOAD_MAX_RETRY_COUNT_DEFAULT, parameterOverrides, - props); + props, + false); - this.updateValue( - MAX_MEMORY_LIMIT_IN_BYTES, MAX_MEMORY_LIMIT_IN_BYTES_DEFAULT, parameterOverrides, props); + this.checkAndUpdate( + MAX_MEMORY_LIMIT_IN_BYTES, + MAX_MEMORY_LIMIT_IN_BYTES_DEFAULT, + parameterOverrides, + props, + false); - this.updateValue( + this.checkAndUpdate( ENABLE_PARQUET_INTERNAL_BUFFERING, ENABLE_PARQUET_INTERNAL_BUFFERING_DEFAULT, parameterOverrides, - props); + props, + false); - this.updateValue( - MAX_CHANNEL_SIZE_IN_BYTES, MAX_CHANNEL_SIZE_IN_BYTES_DEFAULT, parameterOverrides, props); + this.checkAndUpdate( + MAX_CHANNEL_SIZE_IN_BYTES, + MAX_CHANNEL_SIZE_IN_BYTES_DEFAULT, + parameterOverrides, + props, + false); - this.updateValue( - MAX_CHUNK_SIZE_IN_BYTES, MAX_CHUNK_SIZE_IN_BYTES_DEFAULT, parameterOverrides, props); + this.checkAndUpdate( + MAX_CHUNK_SIZE_IN_BYTES, MAX_CHUNK_SIZE_IN_BYTES_DEFAULT, parameterOverrides, props, false); - this.updateValue(MAX_CLIENT_LAG, MAX_CLIENT_LAG_DEFAULT, parameterOverrides, props); + this.checkAndUpdate( + MAX_CLIENT_LAG, + isIcebergMode ? MAX_CLIENT_LAG_ICEBERG_MODE_DEFAULT : MAX_CLIENT_LAG_DEFAULT, + parameterOverrides, + props, + false); - this.updateValue( + this.checkAndUpdate( MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST, - MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT, + isIcebergMode + ? MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT + : MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT, parameterOverrides, - props); + props, + isIcebergMode); - this.updateValue( + this.checkAndUpdate( BDEC_PARQUET_COMPRESSION_ALGORITHM, BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT, parameterOverrides, - props); + props, + false); } /** @return Longest interval in milliseconds between buffer flushes */ diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java index e220aec79..32a352ba9 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal; import java.io.ByteArrayOutputStream; @@ -23,18 +27,21 @@ public void testSerializationErrors() throws Exception { BlobBuilder.constructBlobAndMetadata( "a.bdec", Collections.singletonList(createChannelDataPerTable(1, false)), - Constants.BdecVersion.THREE); + Constants.BdecVersion.THREE, + true); BlobBuilder.constructBlobAndMetadata( "a.bdec", Collections.singletonList(createChannelDataPerTable(1, true)), - Constants.BdecVersion.THREE); + Constants.BdecVersion.THREE, + true); // Construction fails if metadata contains 0 rows and data 1 row try { BlobBuilder.constructBlobAndMetadata( "a.bdec", Collections.singletonList(createChannelDataPerTable(0, false)), - Constants.BdecVersion.THREE); + Constants.BdecVersion.THREE, + true); Assert.fail("Should not pass enableParquetInternalBuffering=false"); } catch (SFException e) { Assert.assertEquals(ErrorCode.INTERNAL_ERROR.getMessageCode(), e.getVendorCode()); @@ -52,7 +59,8 @@ public void testSerializationErrors() throws Exception { BlobBuilder.constructBlobAndMetadata( "a.bdec", Collections.singletonList(createChannelDataPerTable(0, true)), - Constants.BdecVersion.THREE); + Constants.BdecVersion.THREE, + true); Assert.fail("Should not pass enableParquetInternalBuffering=true"); } catch (SFException e) { Assert.assertEquals(ErrorCode.INTERNAL_ERROR.getMessageCode(), e.getVendorCode()); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java index 947908ef9..3882717ff 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal; import static java.time.ZoneOffset.UTC; @@ -9,8 +13,19 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +@RunWith(Parameterized.class) public class ChannelCacheTest { + + @Parameterized.Parameters(name = "isIcebergMode: {0}") + public static Object[] isIcebergMode() { + return new Object[] {false, true}; + } + + @Parameterized.Parameter public static boolean isIcebergMode; + ChannelCache cache; SnowflakeStreamingIngestClientInternal client; SnowflakeStreamingIngestChannelInternal channel1; @@ -24,7 +39,7 @@ public class ChannelCacheTest { @Before public void setup() { cache = new ChannelCache<>(); - client = new SnowflakeStreamingIngestClientInternal<>("client"); + client = new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); channel1 = new SnowflakeStreamingIngestChannelInternal<>( "channel1", diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index 8ac1d2b85..796580e95 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal; import static net.snowflake.ingest.utils.Constants.BLOB_CHECKSUM_SIZE_IN_BYTES; @@ -48,11 +52,22 @@ import net.snowflake.ingest.utils.SFException; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatchers; import org.mockito.Mockito; +@RunWith(Parameterized.class) public class FlushServiceTest { + + @Parameterized.Parameters(name = "isIcebergMode: {0}") + public static Object[] isIcebergMode() { + return new Object[] {false, true}; + } + + @Parameterized.Parameter public static boolean isIcebergMode; + public FlushServiceTest() { this.testContextFactory = ParquetTestContext.createFactory(); } @@ -86,9 +101,12 @@ private abstract static class TestContext implements AutoCloseable { TestContext() { storage = Mockito.mock(StreamingIngestStorage.class); - parameterProvider = new ParameterProvider(); + parameterProvider = new ParameterProvider(isIcebergMode); + InternalParameterProvider internalParameterProvider = + new InternalParameterProvider(isIcebergMode); client = Mockito.mock(SnowflakeStreamingIngestClientInternal.class); Mockito.when(client.getParameterProvider()).thenReturn(parameterProvider); + Mockito.when(client.getInternalParameterProvider()).thenReturn(internalParameterProvider); storageManager = Mockito.spy(new InternalStageManager<>(true, "role", "client", null)); Mockito.doReturn(storage).when(storageManager).getStorage(ArgumentMatchers.any()); Mockito.when(storageManager.getClientPrefix()).thenReturn("client_prefix"); @@ -596,7 +614,10 @@ public void runTestBlobSplitDueToNumberOfChunks(int numberOfRows) throws Excepti Math.ceil( (double) numberOfRows / channelsPerTable - / ParameterProvider.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT); + / (isIcebergMode + ? ParameterProvider + .MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT + : ParameterProvider.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT)); final TestContext>> testContext = testContextFactory.create(); @@ -874,7 +895,7 @@ public void testInvalidateChannels() { // Create a new Client in order to not interfere with other tests SnowflakeStreamingIngestClientInternal client = Mockito.mock(SnowflakeStreamingIngestClientInternal.class); - ParameterProvider parameterProvider = new ParameterProvider(); + ParameterProvider parameterProvider = new ParameterProvider(isIcebergMode); ChannelCache channelCache = new ChannelCache<>(); Mockito.when(client.getChannelCache()).thenReturn(channelCache); Mockito.when(client.getParameterProvider()).thenReturn(parameterProvider); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/InsertRowsBenchmarkTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/InsertRowsBenchmarkTest.java index 5b28e9c45..d28b22669 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/InsertRowsBenchmarkTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/InsertRowsBenchmarkTest.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal; import static java.time.ZoneOffset.UTC; @@ -36,7 +40,7 @@ public class InsertRowsBenchmarkTest { @Setup(Level.Trial) public void setUpBeforeAll() { - client = new SnowflakeStreamingIngestClientInternal("client_PARQUET"); + client = new SnowflakeStreamingIngestClientInternal("client_PARQUET", false); channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/OAuthBasicTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/OAuthBasicTest.java index 6351f267d..832ecd248 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/OAuthBasicTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/OAuthBasicTest.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal; import java.util.Properties; @@ -114,7 +118,7 @@ public void testCreateOAuthClient() throws Exception { @Test public void testSetRefreshToken() throws Exception { SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("TEST_CLIENT"); + new SnowflakeStreamingIngestClientInternal<>("TEST_CLIENT", false); MockOAuthClient mockOAuthClient = new MockOAuthClient(); OAuthManager oAuthManager = diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java index 86cece9c7..6850daa76 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java @@ -1,12 +1,20 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal; +import static net.snowflake.ingest.utils.ParameterProvider.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT; + import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import net.snowflake.ingest.utils.Constants; +import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.ParameterProvider; +import net.snowflake.ingest.utils.SFException; import org.junit.Assert; import org.junit.Test; @@ -31,7 +39,7 @@ private Map getStartingParameterMap() { public void withValuesSet() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); Assert.assertEquals(1000L, parameterProvider.getCachedMaxClientLagInMs()); Assert.assertEquals(4L, parameterProvider.getBufferFlushCheckIntervalInMs()); @@ -54,7 +62,7 @@ public void withNullProps() { parameterMap.put(ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS, 4L); parameterMap.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE, 6); parameterMap.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_BYTES, 1024); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, null); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, null, false); Assert.assertEquals(3000, parameterProvider.getCachedMaxClientLagInMs()); Assert.assertEquals(4, parameterProvider.getBufferFlushCheckIntervalInMs()); @@ -72,7 +80,7 @@ public void withNullParameterMap() { props.put(ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS, 4L); props.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE, 6); props.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_BYTES, 1024); - ParameterProvider parameterProvider = new ParameterProvider(null, props); + ParameterProvider parameterProvider = new ParameterProvider(null, props, false); Assert.assertEquals(3000, parameterProvider.getCachedMaxClientLagInMs()); Assert.assertEquals(4, parameterProvider.getBufferFlushCheckIntervalInMs()); @@ -85,7 +93,7 @@ public void withNullParameterMap() { @Test public void withNullInputs() { - ParameterProvider parameterProvider = new ParameterProvider(null, null); + ParameterProvider parameterProvider = new ParameterProvider(null, null, false); Assert.assertEquals( ParameterProvider.MAX_CLIENT_LAG_DEFAULT, parameterProvider.getCachedMaxClientLagInMs()); @@ -105,7 +113,7 @@ public void withNullInputs() { @Test public void withDefaultValues() { - ParameterProvider parameterProvider = new ParameterProvider(); + ParameterProvider parameterProvider = new ParameterProvider(false); Assert.assertEquals( ParameterProvider.MAX_CLIENT_LAG_DEFAULT, parameterProvider.getCachedMaxClientLagInMs()); @@ -137,12 +145,50 @@ public void withDefaultValues() { parameterProvider.getBdecParquetCompressionAlgorithm()); } + @Test + public void withIcebergDefaultValues() { + ParameterProvider parameterProvider = new ParameterProvider(true); + + Assert.assertEquals( + ParameterProvider.MAX_CLIENT_LAG_ICEBERG_MODE_DEFAULT, + parameterProvider.getCachedMaxClientLagInMs()); + Assert.assertEquals( + ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS_DEFAULT, + parameterProvider.getBufferFlushCheckIntervalInMs()); + Assert.assertEquals( + ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE_DEFAULT, + parameterProvider.getInsertThrottleThresholdInPercentage()); + Assert.assertEquals( + ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_BYTES_DEFAULT, + parameterProvider.getInsertThrottleThresholdInBytes()); + Assert.assertEquals( + ParameterProvider.INSERT_THROTTLE_INTERVAL_IN_MILLIS_DEFAULT, + parameterProvider.getInsertThrottleIntervalInMs()); + Assert.assertEquals( + ParameterProvider.IO_TIME_CPU_RATIO_DEFAULT, parameterProvider.getIOTimeCpuRatio()); + Assert.assertEquals( + ParameterProvider.BLOB_UPLOAD_MAX_RETRY_COUNT_DEFAULT, + parameterProvider.getBlobUploadMaxRetryCount()); + Assert.assertEquals( + ParameterProvider.MAX_MEMORY_LIMIT_IN_BYTES_DEFAULT, + parameterProvider.getMaxMemoryLimitInBytes()); + Assert.assertEquals( + ParameterProvider.MAX_CHANNEL_SIZE_IN_BYTES_DEFAULT, + parameterProvider.getMaxChannelSizeInBytes()); + Assert.assertEquals( + ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT, + parameterProvider.getBdecParquetCompressionAlgorithm()); + Assert.assertEquals( + MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT, + parameterProvider.getMaxChunksInBlobAndRegistrationRequest()); + } + @Test public void testMaxClientLagEnabled() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "2 second"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); Assert.assertEquals(2000, parameterProvider.getCachedMaxClientLagInMs()); // call again to trigger caching logic Assert.assertEquals(2000, parameterProvider.getCachedMaxClientLagInMs()); @@ -153,7 +199,7 @@ public void testMaxClientLagEnabledPluralTimeUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "2 seconds"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); Assert.assertEquals(2000, parameterProvider.getCachedMaxClientLagInMs()); } @@ -162,7 +208,7 @@ public void testMaxClientLagEnabledMinuteTimeUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "1 minute"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); Assert.assertEquals(60000, parameterProvider.getCachedMaxClientLagInMs()); } @@ -171,7 +217,7 @@ public void testMaxClientLagEnabledMinuteTimeUnitPluralTimeUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "2 minutes"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); Assert.assertEquals(120000, parameterProvider.getCachedMaxClientLagInMs()); } @@ -179,7 +225,7 @@ public void testMaxClientLagEnabledMinuteTimeUnitPluralTimeUnit() { public void testMaxClientLagEnabledDefaultValue() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); Assert.assertEquals( ParameterProvider.MAX_CLIENT_LAG_DEFAULT, parameterProvider.getCachedMaxClientLagInMs()); } @@ -189,7 +235,7 @@ public void testMaxClientLagEnabledDefaultUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "3000"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); Assert.assertEquals(3000, parameterProvider.getCachedMaxClientLagInMs()); } @@ -198,7 +244,7 @@ public void testMaxClientLagEnabledLongInput() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, 3000L); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); Assert.assertEquals(3000, parameterProvider.getCachedMaxClientLagInMs()); } @@ -207,7 +253,7 @@ public void testMaxClientLagEnabledMissingUnitTimeUnitSupplied() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, " year"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -221,7 +267,7 @@ public void testMaxClientLagEnabledInvalidTimeUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "1 year"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -235,7 +281,7 @@ public void testMaxClientLagEnabledInvalidUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "banana minute"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -249,7 +295,7 @@ public void testMaxClientLagEnabledThresholdBelow() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "0 second"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -263,7 +309,7 @@ public void testMaxClientLagEnabledThresholdAbove() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "11 minutes"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -277,7 +323,7 @@ public void testMaxClientLagEnableEmptyInput() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, ""); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -291,8 +337,20 @@ public void testMaxChunksInBlobAndRegistrationRequest() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put("max_chunks_in_blob_and_registration_request", 1); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + Assert.assertEquals(1, parameterProvider.getMaxChunksInBlobAndRegistrationRequest()); + + parameterProvider = new ParameterProvider(parameterMap, prop, true); Assert.assertEquals(1, parameterProvider.getMaxChunksInBlobAndRegistrationRequest()); + + SFException e = + Assert.assertThrows( + SFException.class, + () -> { + parameterMap.put("max_chunks_in_blob_and_registration_request", 100); + new ParameterProvider(parameterMap, prop, true); + }); + Assert.assertEquals(e.getVendorCode(), ErrorCode.INVALID_CONFIG_PARAMETER.getMessageCode()); } @Test @@ -303,7 +361,7 @@ public void testValidCompressionAlgorithmsAndWithUppercaseLowerCase() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM, v); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); Assert.assertEquals( Constants.BdecParquetCompression.GZIP, parameterProvider.getBdecParquetCompressionAlgorithm()); @@ -314,7 +372,7 @@ public void testValidCompressionAlgorithmsAndWithUppercaseLowerCase() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM, v); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); Assert.assertEquals( Constants.BdecParquetCompression.ZSTD, parameterProvider.getBdecParquetCompressionAlgorithm()); @@ -326,7 +384,7 @@ public void testInvalidCompressionAlgorithm() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM, "invalid_comp"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); try { parameterProvider.getBdecParquetCompressionAlgorithm(); Assert.fail("Should not have succeeded"); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RegisterServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RegisterServiceTest.java index 37eb5f96e..9d7b95008 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RegisterServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RegisterServiceTest.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal; import static net.snowflake.ingest.utils.Constants.BLOB_UPLOAD_TIMEOUT_IN_SEC; @@ -14,8 +18,17 @@ import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +@RunWith(Parameterized.class) public class RegisterServiceTest { + @Parameterized.Parameters(name = "isIcebergMode: {0}") + public static Object[] isIcebergMode() { + return new Object[] {false, true}; + } + + @Parameterized.Parameter public boolean isIcebergMode; @Test public void testRegisterService() throws ExecutionException, InterruptedException { @@ -45,7 +58,7 @@ public void testRegisterService() throws ExecutionException, InterruptedExceptio @Test public void testRegisterServiceTimeoutException() throws Exception { SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client"); + new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); RegisterService rs = new RegisterService<>(client, true); Pair, CompletableFuture> blobFuture1 = @@ -73,7 +86,7 @@ public void testRegisterServiceTimeoutException() throws Exception { @Test public void testRegisterServiceTimeoutException_testRetries() throws Exception { SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client"); + new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); RegisterService rs = new RegisterService<>(client, true); Pair, CompletableFuture> blobFuture1 = @@ -107,7 +120,7 @@ public void testRegisterServiceTimeoutException_testRetries() throws Exception { @Test public void testRegisterServiceNonTimeoutException() { SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client"); + new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); RegisterService rs = new RegisterService<>(client, true); CompletableFuture future = new CompletableFuture<>(); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java index 5d8d8d36a..dc3285df8 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java @@ -48,8 +48,11 @@ import net.snowflake.ingest.utils.Utils; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.mockito.Mockito; +@RunWith(Parameterized.class) public class SnowflakeStreamingIngestChannelTest { /** @@ -71,6 +74,13 @@ public long getFreeMemory() { } } + @Parameterized.Parameters(name = "isIcebergMode: {0}") + public static Object[] isIcebergMode() { + return new Object[] {false, true}; + } + + @Parameterized.Parameter public boolean isIcebergMode; + @Test public void testChannelFactoryNullFields() { String name = "CHANNEL"; @@ -80,7 +90,7 @@ public void testChannelFactoryNullFields() { long channelSequencer = 0L; long rowSequencer = 0L; SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client"); + new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); Object[] fields = new Object[] { @@ -122,7 +132,7 @@ public void testChannelFactorySuccess() { long rowSequencer = 0L; SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client"); + new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); SnowflakeStreamingIngestChannelInternal channel = SnowflakeStreamingIngestChannelFactory.builder(name) @@ -157,7 +167,7 @@ public void testChannelFactorySuccess() { @Test public void testChannelValid() { SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client"); + new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -207,7 +217,7 @@ public void testChannelValid() { @Test public void testChannelClose() { SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client"); + new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -351,6 +361,7 @@ public void testOpenChannelErrorResponse() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -421,6 +432,7 @@ public void testOpenChannelSnowflakeInternalErrorResponse() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -443,6 +455,10 @@ public void testOpenChannelSnowflakeInternalErrorResponse() throws Exception { @Test public void testOpenChannelSuccessResponse() throws Exception { + // TODO: SNOW-1490151 Iceberg testing gaps + if (isIcebergMode) { + return; + } String name = "CHANNEL"; String dbName = "STREAMINGINGEST_TEST"; String schemaName = "PUBLIC"; @@ -503,6 +519,7 @@ public void testOpenChannelSuccessResponse() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -541,7 +558,9 @@ public void testOpenChannelSuccessResponse() throws Exception { @Test public void testInsertRow() { SnowflakeStreamingIngestClientInternal client; - client = new SnowflakeStreamingIngestClientInternal("client_PARQUET"); + client = + new SnowflakeStreamingIngestClientInternal( + "client_PARQUET", isIcebergMode); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -625,7 +644,8 @@ public void testInsertTooLargeRow() { schema.forEach(x -> row.put(x.getName(), byteArrayOneMb)); SnowflakeStreamingIngestClientInternal client; - client = new SnowflakeStreamingIngestClientInternal("test_client"); + client = + new SnowflakeStreamingIngestClientInternal("test_client", isIcebergMode); // Test channel with on error CONTINUE SnowflakeStreamingIngestChannelInternal channel = @@ -709,7 +729,7 @@ public void testInsertRowThrottling() { memoryInfoProvider.maxMemory = maxMemory; SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client"); + new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -725,7 +745,7 @@ public void testInsertRowThrottling() { OpenChannelRequest.OnErrorOption.CONTINUE, UTC); - ParameterProvider parameterProvider = new ParameterProvider(); + ParameterProvider parameterProvider = new ParameterProvider(isIcebergMode); memoryInfoProvider.freeMemory = maxMemory * (parameterProvider.getInsertThrottleThresholdInPercentage() - 1) / 100; @@ -755,7 +775,7 @@ public void testInsertRowThrottling() { @Test public void testFlush() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -791,7 +811,7 @@ public void testFlush() throws Exception { @Test public void testClose() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); SnowflakeStreamingIngestChannel channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -825,7 +845,7 @@ public void testClose() throws Exception { @Test public void testDropOnClose() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -862,7 +882,7 @@ public void testDropOnClose() throws Exception { @Test public void testDropOnCloseInvalidChannel() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -895,7 +915,7 @@ public void testDropOnCloseInvalidChannel() throws Exception { public void testGetLatestCommittedOffsetToken() { String offsetToken = "10"; SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); SnowflakeStreamingIngestChannel channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java index 553efbd31..9d52ccf4e 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal; import static java.time.ZoneOffset.UTC; @@ -66,9 +70,12 @@ import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.mockito.ArgumentMatchers; import org.mockito.Mockito; +@RunWith(Parameterized.class) public class SnowflakeStreamingIngestClientTest { private static final ObjectMapper objectMapper = new ObjectMapper(); @@ -79,6 +86,14 @@ public class SnowflakeStreamingIngestClientTest { SnowflakeStreamingIngestChannelInternal channel3; SnowflakeStreamingIngestChannelInternal channel4; + // TODO: Add IcebergMode = True after Streaming to Iceberg is supported. + @Parameterized.Parameters(name = "isIcebergMode: {0}") + public static Object[] isIcebergMode() { + return new Object[] {false}; + } + + @Parameterized.Parameter public boolean isIcebergMode; + @Before public void setup() throws Exception { objectMapper.setVisibility(PropertyAccessor.GETTER, JsonAutoDetect.Visibility.ANY); @@ -98,6 +113,7 @@ public void setup() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -362,6 +378,7 @@ public void testGetChannelsStatusWithRequest() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -421,6 +438,7 @@ public void testDropChannel() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -466,6 +484,7 @@ public void testGetChannelsStatusWithRequestError() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -519,6 +538,7 @@ public void testRegisterBlobRequestCreationSuccess() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -710,6 +730,7 @@ public void testGetRetryBlobs() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -751,6 +772,7 @@ public void testRegisterBlobErrorResponse() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -798,6 +820,7 @@ public void testRegisterBlobSnowflakeInternalErrorResponse() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -854,6 +877,7 @@ public void testRegisterBlobSuccessResponse() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -938,6 +962,7 @@ public void testRegisterBlobsRetries() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -967,6 +992,7 @@ public void testRegisterBlobChunkLimit() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null)); @@ -1139,6 +1165,7 @@ public void testRegisterBlobsRetriesSucceeds() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -1213,6 +1240,7 @@ public void testRegisterBlobResponseWithInvalidChannel() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -1263,7 +1291,7 @@ public void testRegisterBlobResponseWithInvalidChannel() throws Exception { @Test public void testFlush() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); ChannelsStatusResponse response = new ChannelsStatusResponse(); response.setStatusCode(0L); response.setMessage("Success"); @@ -1285,7 +1313,7 @@ public void testFlush() throws Exception { @Test public void testClose() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); ChannelsStatusResponse response = new ChannelsStatusResponse(); response.setStatusCode(0L); response.setMessage("Success"); @@ -1319,7 +1347,7 @@ public void testClose() throws Exception { @Test public void testCloseWithError() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); CompletableFuture future = new CompletableFuture<>(); future.completeExceptionally(new Exception("Simulating Error")); @@ -1357,7 +1385,7 @@ public void testCloseWithError() throws Exception { @Test public void testVerifyChannelsAreFullyCommittedSuccess() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel1", @@ -1405,6 +1433,7 @@ public void testFlushServiceException() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, parameterMap); @@ -1441,6 +1470,7 @@ public void testGetLatestCommittedOffsetTokens() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); From adb7b80e80e6ddad8300fe9608e978cdf6369259 Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Mon, 29 Jul 2024 17:12:36 -0700 Subject: [PATCH 2/7] fix tests --- .../ingest/streaming/internal/BlobBuilder.java | 8 +++++--- .../ingest/utils/ParameterProvider.java | 4 ++-- .../streaming/internal/BlobBuilderTest.java | 17 +++++++++++++---- .../internal/InsertRowsBenchmarkTest.java | 1 + .../streaming/internal/OAuthBasicTest.java | 1 + 5 files changed, 22 insertions(+), 9 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java b/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java index fcbed8176..8e65ed677 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java @@ -87,8 +87,9 @@ static Blob constructBlobAndMetadata( flusher.serialize(channelsDataPerTable, filePath); if (!serializedChunk.channelsMetadataList.isEmpty()) { - byte[] compressedChunkData; - int chunkLength; + final byte[] compressedChunkData; + final int chunkLength; + final int compressedChunkDataSize; if (encrypt) { Pair paddedChunk = @@ -105,14 +106,15 @@ static Blob constructBlobAndMetadata( compressedChunkData = Cryptor.encrypt( paddedChunkData, firstChannelFlushContext.getEncryptionKey(), filePath, iv); + compressedChunkDataSize = compressedChunkData.length; } else { compressedChunkData = serializedChunk.chunkData.toByteArray(); chunkLength = compressedChunkData.length; + compressedChunkDataSize = chunkLength; } // Compute the md5 of the chunk data String md5 = computeMD5(compressedChunkData, chunkLength); - int compressedChunkDataSize = compressedChunkData.length; // Create chunk metadata long startOffset = curDataSize; diff --git a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java index 7d8cb230f..8e8f3ed62 100644 --- a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java +++ b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java @@ -56,8 +56,8 @@ public class ParameterProvider { public static final int IO_TIME_CPU_RATIO_DEFAULT = 2; public static final int BLOB_UPLOAD_MAX_RETRY_COUNT_DEFAULT = 24; public static final long MAX_MEMORY_LIMIT_IN_BYTES_DEFAULT = -1L; - public static final long MAX_CHANNEL_SIZE_IN_BYTES_DEFAULT = 128 * 1024 * 1024; - public static final long MAX_CHUNK_SIZE_IN_BYTES_DEFAULT = 512 * 1024 * 1024; + public static final long MAX_CHANNEL_SIZE_IN_BYTES_DEFAULT = 64 * 1024 * 1024; + public static final long MAX_CHUNK_SIZE_IN_BYTES_DEFAULT = 256 * 1024 * 1024; // Lag related parameters public static final long MAX_CLIENT_LAG_DEFAULT = 1000; // 1 second diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java index 32a352ba9..17d90f458 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java @@ -17,9 +17,18 @@ import org.apache.parquet.schema.MessageType; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.mockito.Mockito; +@RunWith(Parameterized.class) public class BlobBuilderTest { + @Parameterized.Parameters(name = "encrypt: {0}") + public static Object[] encrypt() { + return new Object[] {false, true}; + } + + @Parameterized.Parameter public boolean encrypt; @Test public void testSerializationErrors() throws Exception { @@ -28,12 +37,12 @@ public void testSerializationErrors() throws Exception { "a.bdec", Collections.singletonList(createChannelDataPerTable(1, false)), Constants.BdecVersion.THREE, - true); + encrypt); BlobBuilder.constructBlobAndMetadata( "a.bdec", Collections.singletonList(createChannelDataPerTable(1, true)), Constants.BdecVersion.THREE, - true); + encrypt); // Construction fails if metadata contains 0 rows and data 1 row try { @@ -41,7 +50,7 @@ public void testSerializationErrors() throws Exception { "a.bdec", Collections.singletonList(createChannelDataPerTable(0, false)), Constants.BdecVersion.THREE, - true); + encrypt); Assert.fail("Should not pass enableParquetInternalBuffering=false"); } catch (SFException e) { Assert.assertEquals(ErrorCode.INTERNAL_ERROR.getMessageCode(), e.getVendorCode()); @@ -60,7 +69,7 @@ public void testSerializationErrors() throws Exception { "a.bdec", Collections.singletonList(createChannelDataPerTable(0, true)), Constants.BdecVersion.THREE, - true); + encrypt); Assert.fail("Should not pass enableParquetInternalBuffering=true"); } catch (SFException e) { Assert.assertEquals(ErrorCode.INTERNAL_ERROR.getMessageCode(), e.getVendorCode()); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/InsertRowsBenchmarkTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/InsertRowsBenchmarkTest.java index d28b22669..a58fc9ef2 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/InsertRowsBenchmarkTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/InsertRowsBenchmarkTest.java @@ -40,6 +40,7 @@ public class InsertRowsBenchmarkTest { @Setup(Level.Trial) public void setUpBeforeAll() { + // SNOW-1490151: Testing gaps client = new SnowflakeStreamingIngestClientInternal("client_PARQUET", false); channel = new SnowflakeStreamingIngestChannelInternal<>( diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/OAuthBasicTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/OAuthBasicTest.java index 832ecd248..688ec0c91 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/OAuthBasicTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/OAuthBasicTest.java @@ -117,6 +117,7 @@ public void testCreateOAuthClient() throws Exception { @Test public void testSetRefreshToken() throws Exception { + // SNOW-1490151: Testing gaps SnowflakeStreamingIngestClientInternal client = new SnowflakeStreamingIngestClientInternal<>("TEST_CLIENT", false); MockOAuthClient mockOAuthClient = new MockOAuthClient(); From 3d95d22ea67c87a5d758d5aacd1f497a025302a7 Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Tue, 30 Jul 2024 15:02:13 -0700 Subject: [PATCH 3/7] split max chunks in blob and max chunks in registration request --- .../streaming/internal/FlushService.java | 6 +- ...nowflakeStreamingIngestClientInternal.java | 9 +- .../ingest/utils/ParameterProvider.java | 39 +++-- .../streaming/internal/FlushServiceTest.java | 5 +- .../internal/InsertRowsBenchmarkTest.java | 13 +- .../streaming/internal/ManyTablesIT.java | 7 +- .../internal/ParameterProviderTest.java | 148 +++++++++--------- .../SnowflakeStreamingIngestClientTest.java | 3 +- 8 files changed, 122 insertions(+), 108 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java index 7c343209b..5e28e0964 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java @@ -335,15 +335,13 @@ void distributeFlushTasks() { channelsDataPerTable.addAll(leftoverChannelsDataPerTable); leftoverChannelsDataPerTable.clear(); } else if (blobData.size() - >= this.owningClient - .getParameterProvider() - .getMaxChunksInBlobAndRegistrationRequest()) { + >= this.owningClient.getParameterProvider().getMaxChunksInBlob()) { // Create a new blob if the current one already contains max allowed number of chunks logger.logInfo( "Max allowed number of chunks in the current blob reached. chunkCount={}" + " maxChunkCount={} currentBlobPath={}", blobData.size(), - this.owningClient.getParameterProvider().getMaxChunksInBlobAndRegistrationRequest(), + this.owningClient.getParameterProvider().getMaxChunksInBlob(), blobPath); break; } else { diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index a53c5fb56..413432e50 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -507,21 +507,20 @@ List> partitionBlobListForRegistrationRequest(List> result = new ArrayList<>(); List currentBatch = new ArrayList<>(); int chunksInCurrentBatch = 0; - int maxChunksInBlobAndRegistrationRequest = - parameterProvider.getMaxChunksInBlobAndRegistrationRequest(); + int maxChunksInRegistrationRequest = parameterProvider.getMaxChunksInRegistrationRequest(); for (BlobMetadata blob : blobs) { - if (blob.getChunks().size() > maxChunksInBlobAndRegistrationRequest) { + if (blob.getChunks().size() > maxChunksInRegistrationRequest) { throw new SFException( ErrorCode.INTERNAL_ERROR, String.format( "Incorrectly generated blob detected - number of chunks in the blob is larger than" + " the max allowed number of chunks. Please report this bug to Snowflake." + " bdec=%s chunkCount=%d maxAllowedChunkCount=%d", - blob.getPath(), blob.getChunks().size(), maxChunksInBlobAndRegistrationRequest)); + blob.getPath(), blob.getChunks().size(), maxChunksInRegistrationRequest)); } - if (chunksInCurrentBatch + blob.getChunks().size() > maxChunksInBlobAndRegistrationRequest) { + if (chunksInCurrentBatch + blob.getChunks().size() > maxChunksInRegistrationRequest) { // Newly added BDEC file would exceed the max number of chunks in a single registration // request. We put chunks collected so far into the result list and create a new batch with // the current blob diff --git a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java index 8e8f3ed62..42dbd8532 100644 --- a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java +++ b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java @@ -37,8 +37,9 @@ public class ParameterProvider { public static final String MAX_CHUNK_SIZE_IN_BYTES = "MAX_CHUNK_SIZE_IN_BYTES".toLowerCase(); public static final String MAX_ALLOWED_ROW_SIZE_IN_BYTES = "MAX_ALLOWED_ROW_SIZE_IN_BYTES".toLowerCase(); - public static final String MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST = - "MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST".toLowerCase(); + public static final String MAX_CHUNKS_IN_BLOB = "MAX_CHUNKS_IN_BLOB".toLowerCase(); + public static final String MAX_CHUNKS_IN_REGISTRATION_REQUEST = + "MAX_CHUNKS_IN_REGISTRATION_REQUEST".toLowerCase(); public static final String MAX_CLIENT_LAG = "MAX_CLIENT_LAG".toLowerCase(); @@ -66,13 +67,14 @@ public class ParameterProvider { static final long MAX_CLIENT_LAG_MS_MAX = TimeUnit.MINUTES.toMillis(10); public static final long MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT = 64 * 1024 * 1024; // 64 MB - public static final int MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT = 100; + public static final int MAX_CHUNKS_IN_BLOB_DEFAULT = 100; + public static final int MAX_CHUNKS_IN_REGISTRATION_REQUEST_DEFAULT = 100; public static final Constants.BdecParquetCompression BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT = Constants.BdecParquetCompression.GZIP; /* Iceberg mode parameters: When streaming to Iceberg mode, different default parameters are required because it generates Parquet files instead of BDEC files. */ - public static final int MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT = 1; + public static final int MAX_CHUNKS_IN_BLOB_ICEBERG_MODE_DEFAULT = 1; /* Parameter that enables using internal Parquet buffers for buffering of rows before serializing. It reduces memory consumption compared to using Java Objects for buffering.*/ @@ -233,14 +235,19 @@ private void setParameterMap( false); this.checkAndUpdate( - MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST, - isIcebergMode - ? MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT - : MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT, + MAX_CHUNKS_IN_BLOB, + isIcebergMode ? MAX_CHUNKS_IN_BLOB_ICEBERG_MODE_DEFAULT : MAX_CHUNKS_IN_BLOB_DEFAULT, parameterOverrides, props, isIcebergMode); + this.checkAndUpdate( + MAX_CHUNKS_IN_REGISTRATION_REQUEST, + MAX_CHUNKS_IN_REGISTRATION_REQUEST_DEFAULT, + parameterOverrides, + props, + false); + this.checkAndUpdate( BDEC_PARQUET_COMPRESSION_ALGORITHM, BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT, @@ -443,15 +450,17 @@ public long getMaxAllowedRowSizeInBytes() { return (val instanceof String) ? Long.parseLong(val.toString()) : (long) val; } - /** - * @return The max number of chunks that can be put into a single BDEC or blob registration - * request. - */ - public int getMaxChunksInBlobAndRegistrationRequest() { + /** @return The max number of chunks that can be put into a single BDEC. */ + public int getMaxChunksInBlob() { + Object val = this.parameterMap.getOrDefault(MAX_CHUNKS_IN_BLOB, MAX_CHUNKS_IN_BLOB_DEFAULT); + return (val instanceof String) ? Integer.parseInt(val.toString()) : (int) val; + } + + /** @return The max number of chunks that can be put into a single blob registration request. */ + public int getMaxChunksInRegistrationRequest() { Object val = this.parameterMap.getOrDefault( - MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST, - MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT); + MAX_CHUNKS_IN_REGISTRATION_REQUEST, MAX_CHUNKS_IN_REGISTRATION_REQUEST_DEFAULT); return (val instanceof String) ? Integer.parseInt(val.toString()) : (int) val; } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index 796580e95..4f93e39f9 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -615,9 +615,8 @@ public void runTestBlobSplitDueToNumberOfChunks(int numberOfRows) throws Excepti (double) numberOfRows / channelsPerTable / (isIcebergMode - ? ParameterProvider - .MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT - : ParameterProvider.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT)); + ? ParameterProvider.MAX_CHUNKS_IN_BLOB_ICEBERG_MODE_DEFAULT + : ParameterProvider.MAX_CHUNKS_IN_BLOB_DEFAULT)); final TestContext>> testContext = testContextFactory.create(); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/InsertRowsBenchmarkTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/InsertRowsBenchmarkTest.java index a58fc9ef2..37e41df5a 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/InsertRowsBenchmarkTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/InsertRowsBenchmarkTest.java @@ -15,6 +15,8 @@ import net.snowflake.ingest.utils.Utils; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.Level; import org.openjdk.jmh.annotations.Mode; @@ -30,7 +32,14 @@ import org.openjdk.jmh.runner.options.TimeValue; @State(Scope.Thread) +@RunWith(Parameterized.class) public class InsertRowsBenchmarkTest { + @Parameterized.Parameters(name = "isIcebergMode: {0}") + public static Object[] isIcebergMode() { + return new Object[] {false, true}; + } + + @Parameterized.Parameter public boolean isIcebergMode; private SnowflakeStreamingIngestChannelInternal channel; private SnowflakeStreamingIngestClientInternal client; @@ -41,7 +50,9 @@ public class InsertRowsBenchmarkTest { @Setup(Level.Trial) public void setUpBeforeAll() { // SNOW-1490151: Testing gaps - client = new SnowflakeStreamingIngestClientInternal("client_PARQUET", false); + client = + new SnowflakeStreamingIngestClientInternal( + "client_PARQUET", isIcebergMode); channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ManyTablesIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/ManyTablesIT.java index d32adfe3f..408bf66ca 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ManyTablesIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ManyTablesIT.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal; import static net.snowflake.ingest.utils.Constants.ROLE; @@ -37,7 +41,8 @@ public class ManyTablesIT { @Before public void setUp() throws Exception { Properties props = TestUtils.getProperties(Constants.BdecVersion.THREE, false); - props.put(ParameterProvider.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST, 2); + props.put(ParameterProvider.MAX_CHUNKS_IN_BLOB, 2); + props.put(ParameterProvider.MAX_CHUNKS_IN_REGISTRATION_REQUEST, 2); if (props.getProperty(ROLE).equals("DEFAULT_ROLE")) { props.setProperty(ROLE, "ACCOUNTADMIN"); } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java index 6850daa76..cd42e8faa 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java @@ -4,8 +4,6 @@ package net.snowflake.ingest.streaming.internal; -import static net.snowflake.ingest.utils.ParameterProvider.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT; - import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -17,9 +15,19 @@ import net.snowflake.ingest.utils.SFException; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +@RunWith(Parameterized.class) public class ParameterProviderTest { + @Parameterized.Parameters(name = "isIcebergMode: {0}") + public static Object[] isIcebergMode() { + return new Object[] {false, true}; + } + + @Parameterized.Parameter public boolean isIcebergMode; + private Map getStartingParameterMap() { Map parameterMap = new HashMap<>(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, 1000L); @@ -39,7 +47,7 @@ private Map getStartingParameterMap() { public void withValuesSet() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); Assert.assertEquals(1000L, parameterProvider.getCachedMaxClientLagInMs()); Assert.assertEquals(4L, parameterProvider.getBufferFlushCheckIntervalInMs()); @@ -62,7 +70,7 @@ public void withNullProps() { parameterMap.put(ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS, 4L); parameterMap.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE, 6); parameterMap.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_BYTES, 1024); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, null, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, null, isIcebergMode); Assert.assertEquals(3000, parameterProvider.getCachedMaxClientLagInMs()); Assert.assertEquals(4, parameterProvider.getBufferFlushCheckIntervalInMs()); @@ -80,7 +88,7 @@ public void withNullParameterMap() { props.put(ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS, 4L); props.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE, 6); props.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_BYTES, 1024); - ParameterProvider parameterProvider = new ParameterProvider(null, props, false); + ParameterProvider parameterProvider = new ParameterProvider(null, props, isIcebergMode); Assert.assertEquals(3000, parameterProvider.getCachedMaxClientLagInMs()); Assert.assertEquals(4, parameterProvider.getBufferFlushCheckIntervalInMs()); @@ -93,10 +101,13 @@ public void withNullParameterMap() { @Test public void withNullInputs() { - ParameterProvider parameterProvider = new ParameterProvider(null, null, false); + ParameterProvider parameterProvider = new ParameterProvider(null, null, isIcebergMode); Assert.assertEquals( - ParameterProvider.MAX_CLIENT_LAG_DEFAULT, parameterProvider.getCachedMaxClientLagInMs()); + isIcebergMode + ? ParameterProvider.MAX_CLIENT_LAG_ICEBERG_MODE_DEFAULT + : ParameterProvider.MAX_CLIENT_LAG_DEFAULT, + parameterProvider.getCachedMaxClientLagInMs()); Assert.assertEquals( ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS_DEFAULT, parameterProvider.getBufferFlushCheckIntervalInMs()); @@ -113,44 +124,12 @@ public void withNullInputs() { @Test public void withDefaultValues() { - ParameterProvider parameterProvider = new ParameterProvider(false); + ParameterProvider parameterProvider = new ParameterProvider(isIcebergMode); Assert.assertEquals( - ParameterProvider.MAX_CLIENT_LAG_DEFAULT, parameterProvider.getCachedMaxClientLagInMs()); - Assert.assertEquals( - ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS_DEFAULT, - parameterProvider.getBufferFlushCheckIntervalInMs()); - Assert.assertEquals( - ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE_DEFAULT, - parameterProvider.getInsertThrottleThresholdInPercentage()); - Assert.assertEquals( - ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_BYTES_DEFAULT, - parameterProvider.getInsertThrottleThresholdInBytes()); - Assert.assertEquals( - ParameterProvider.INSERT_THROTTLE_INTERVAL_IN_MILLIS_DEFAULT, - parameterProvider.getInsertThrottleIntervalInMs()); - Assert.assertEquals( - ParameterProvider.IO_TIME_CPU_RATIO_DEFAULT, parameterProvider.getIOTimeCpuRatio()); - Assert.assertEquals( - ParameterProvider.BLOB_UPLOAD_MAX_RETRY_COUNT_DEFAULT, - parameterProvider.getBlobUploadMaxRetryCount()); - Assert.assertEquals( - ParameterProvider.MAX_MEMORY_LIMIT_IN_BYTES_DEFAULT, - parameterProvider.getMaxMemoryLimitInBytes()); - Assert.assertEquals( - ParameterProvider.MAX_CHANNEL_SIZE_IN_BYTES_DEFAULT, - parameterProvider.getMaxChannelSizeInBytes()); - Assert.assertEquals( - ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT, - parameterProvider.getBdecParquetCompressionAlgorithm()); - } - - @Test - public void withIcebergDefaultValues() { - ParameterProvider parameterProvider = new ParameterProvider(true); - - Assert.assertEquals( - ParameterProvider.MAX_CLIENT_LAG_ICEBERG_MODE_DEFAULT, + isIcebergMode + ? ParameterProvider.MAX_CLIENT_LAG_ICEBERG_MODE_DEFAULT + : ParameterProvider.MAX_CLIENT_LAG_DEFAULT, parameterProvider.getCachedMaxClientLagInMs()); Assert.assertEquals( ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS_DEFAULT, @@ -179,8 +158,13 @@ public void withIcebergDefaultValues() { ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT, parameterProvider.getBdecParquetCompressionAlgorithm()); Assert.assertEquals( - MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT, - parameterProvider.getMaxChunksInBlobAndRegistrationRequest()); + isIcebergMode + ? ParameterProvider.MAX_CHUNKS_IN_BLOB_ICEBERG_MODE_DEFAULT + : ParameterProvider.MAX_CHUNKS_IN_BLOB_DEFAULT, + parameterProvider.getMaxChunksInBlob()); + Assert.assertEquals( + ParameterProvider.MAX_CHUNKS_IN_REGISTRATION_REQUEST_DEFAULT, + parameterProvider.getMaxChunksInRegistrationRequest()); } @Test @@ -188,7 +172,7 @@ public void testMaxClientLagEnabled() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "2 second"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); Assert.assertEquals(2000, parameterProvider.getCachedMaxClientLagInMs()); // call again to trigger caching logic Assert.assertEquals(2000, parameterProvider.getCachedMaxClientLagInMs()); @@ -199,7 +183,7 @@ public void testMaxClientLagEnabledPluralTimeUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "2 seconds"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); Assert.assertEquals(2000, parameterProvider.getCachedMaxClientLagInMs()); } @@ -208,7 +192,7 @@ public void testMaxClientLagEnabledMinuteTimeUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "1 minute"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); Assert.assertEquals(60000, parameterProvider.getCachedMaxClientLagInMs()); } @@ -217,7 +201,7 @@ public void testMaxClientLagEnabledMinuteTimeUnitPluralTimeUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "2 minutes"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); Assert.assertEquals(120000, parameterProvider.getCachedMaxClientLagInMs()); } @@ -225,7 +209,7 @@ public void testMaxClientLagEnabledMinuteTimeUnitPluralTimeUnit() { public void testMaxClientLagEnabledDefaultValue() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); Assert.assertEquals( ParameterProvider.MAX_CLIENT_LAG_DEFAULT, parameterProvider.getCachedMaxClientLagInMs()); } @@ -235,7 +219,7 @@ public void testMaxClientLagEnabledDefaultUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "3000"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); Assert.assertEquals(3000, parameterProvider.getCachedMaxClientLagInMs()); } @@ -244,7 +228,7 @@ public void testMaxClientLagEnabledLongInput() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, 3000L); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); Assert.assertEquals(3000, parameterProvider.getCachedMaxClientLagInMs()); } @@ -253,7 +237,7 @@ public void testMaxClientLagEnabledMissingUnitTimeUnitSupplied() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, " year"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -267,7 +251,7 @@ public void testMaxClientLagEnabledInvalidTimeUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "1 year"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -281,7 +265,7 @@ public void testMaxClientLagEnabledInvalidUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "banana minute"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -295,7 +279,7 @@ public void testMaxClientLagEnabledThresholdBelow() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "0 second"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -309,7 +293,7 @@ public void testMaxClientLagEnabledThresholdAbove() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "11 minutes"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -323,7 +307,7 @@ public void testMaxClientLagEnableEmptyInput() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, ""); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -333,24 +317,32 @@ public void testMaxClientLagEnableEmptyInput() { } @Test - public void testMaxChunksInBlobAndRegistrationRequest() { + public void testMaxChunksInBlob() { + Properties prop = new Properties(); + Map parameterMap = getStartingParameterMap(); + parameterMap.put("max_chunks_in_blob", 1); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); + Assert.assertEquals(1, parameterProvider.getMaxChunksInBlob()); + + if (isIcebergMode) { + SFException e = + Assert.assertThrows( + SFException.class, + () -> { + parameterMap.put("max_chunks_in_blob", 100); + new ParameterProvider(parameterMap, prop, isIcebergMode); + }); + Assert.assertEquals(e.getVendorCode(), ErrorCode.INVALID_CONFIG_PARAMETER.getMessageCode()); + } + } + + @Test + public void testMaxChunksInRegistrationRequest() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); - parameterMap.put("max_chunks_in_blob_and_registration_request", 1); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); - Assert.assertEquals(1, parameterProvider.getMaxChunksInBlobAndRegistrationRequest()); - - parameterProvider = new ParameterProvider(parameterMap, prop, true); - Assert.assertEquals(1, parameterProvider.getMaxChunksInBlobAndRegistrationRequest()); - - SFException e = - Assert.assertThrows( - SFException.class, - () -> { - parameterMap.put("max_chunks_in_blob_and_registration_request", 100); - new ParameterProvider(parameterMap, prop, true); - }); - Assert.assertEquals(e.getVendorCode(), ErrorCode.INVALID_CONFIG_PARAMETER.getMessageCode()); + parameterMap.put("max_chunks_in_registration_request", 1); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); + Assert.assertEquals(1, parameterProvider.getMaxChunksInRegistrationRequest()); } @Test @@ -361,7 +353,8 @@ public void testValidCompressionAlgorithmsAndWithUppercaseLowerCase() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM, v); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = + new ParameterProvider(parameterMap, prop, isIcebergMode); Assert.assertEquals( Constants.BdecParquetCompression.GZIP, parameterProvider.getBdecParquetCompressionAlgorithm()); @@ -372,7 +365,8 @@ public void testValidCompressionAlgorithmsAndWithUppercaseLowerCase() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM, v); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = + new ParameterProvider(parameterMap, prop, isIcebergMode); Assert.assertEquals( Constants.BdecParquetCompression.ZSTD, parameterProvider.getBdecParquetCompressionAlgorithm()); @@ -384,7 +378,7 @@ public void testInvalidCompressionAlgorithm() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM, "invalid_comp"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); try { parameterProvider.getBdecParquetCompressionAlgorithm(); Assert.fail("Should not have succeeded"); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java index 9d52ccf4e..c9ca86b35 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java @@ -86,10 +86,9 @@ public class SnowflakeStreamingIngestClientTest { SnowflakeStreamingIngestChannelInternal channel3; SnowflakeStreamingIngestChannelInternal channel4; - // TODO: Add IcebergMode = True after Streaming to Iceberg is supported. @Parameterized.Parameters(name = "isIcebergMode: {0}") public static Object[] isIcebergMode() { - return new Object[] {false}; + return new Object[] {false, true}; } @Parameterized.Parameter public boolean isIcebergMode; From c9a45ae842693a4c8bef39668c21a539848ab63f Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Tue, 30 Jul 2024 15:57:53 -0700 Subject: [PATCH 4/7] fix FlushService merge --- .../ingest/streaming/internal/FlushService.java | 8 ++++---- .../ingest/streaming/internal/FlushServiceTest.java | 13 ++++++++++--- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java index d4fe97ec6..6f5296209 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java @@ -192,12 +192,12 @@ private CompletableFuture distributeFlush( /** If tracing is enabled, print always else, check if it needs flush or is forceful. */ private void logFlushTask(boolean isForce, Set tablesToFlush, long flushStartTime) { boolean isNeedFlush = - this.owningClient.getParameterProvider().getMaxChunksInBlobAndRegistrationRequest() == 1 + this.owningClient.getParameterProvider().getMaxChunksInBlob() == 1 ? tablesToFlush.stream().anyMatch(channelCache::getNeedFlush) : this.isNeedFlush; long currentTime = System.currentTimeMillis(); final String logInfo; - if (this.owningClient.getParameterProvider().getMaxChunksInBlobAndRegistrationRequest() == 1) { + if (this.owningClient.getParameterProvider().getMaxChunksInBlob() == 1) { logInfo = String.format( "Tables=[%s]", @@ -272,7 +272,7 @@ CompletableFuture flush(boolean isForce) { this.owningClient.getParameterProvider().getCachedMaxClientLagInMs(); final Set tablesToFlush; - if (this.owningClient.getParameterProvider().getMaxChunksInBlobAndRegistrationRequest() == 1) { + if (this.owningClient.getParameterProvider().getMaxChunksInBlob() == 1) { tablesToFlush = this.channelCache.keySet().stream() .filter( @@ -694,7 +694,7 @@ void shutdown() throws InterruptedException { */ void setNeedFlush(String fullyQualifiedTableName) { this.isNeedFlush = true; - if (this.owningClient.getParameterProvider().getMaxChunksInBlobAndRegistrationRequest() == 1) { + if (this.owningClient.getParameterProvider().getMaxChunksInBlob() == 1) { this.channelCache.setNeedFlush(fullyQualifiedTableName, true); } } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index 8048020d5..76cf7551d 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -122,7 +122,7 @@ private abstract static class TestContext implements AutoCloseable { } void setParameterOverride(Map parameterOverride) { - this.parameterProvider = new ParameterProvider(parameterOverride, null); + this.parameterProvider = new ParameterProvider(parameterOverride, null, isIcebergMode); } ChannelData flushChannel(String name) { @@ -455,10 +455,17 @@ public void testGetFilePath() { } @Test - public void testFlush() throws Exception { + public void testInterleaveFlush() throws Exception { + if (isIcebergMode) { + // Interleaved blob is not supported in iceberg mode + return; + } int numChannels = 4; Long maxLastFlushTime = Long.MAX_VALUE - 1000L; // -1000L to avoid jitter overflow TestContext>> testContext = testContextFactory.create(); + testContext.setParameterOverride( + Collections.singletonMap( + ParameterProvider.MAX_CHUNKS_IN_BLOB, ParameterProvider.MAX_CHUNKS_IN_BLOB_DEFAULT)); addChannel1(testContext); FlushService flushService = testContext.flushService; ChannelCache channelCache = testContext.channelCache; @@ -523,7 +530,7 @@ public void testNonInterleaveFlush() throws ExecutionException, InterruptedExcep ChannelCache channelCache = testContext.channelCache; Mockito.when(flushService.isTestMode()).thenReturn(false); testContext.setParameterOverride( - Collections.singletonMap(ParameterProvider.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST, 1)); + Collections.singletonMap(ParameterProvider.MAX_CHUNKS_IN_BLOB, 1)); // Test need flush IntStream.range(0, numChannels) From 780ded21da8f139ed6e8fc6ea1374e5d308cf581 Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Wed, 31 Jul 2024 16:59:06 -0700 Subject: [PATCH 5/7] Add parameter check / Remove default paramter value from parameter maps --- .../ingest/utils/ParameterProvider.java | 51 +++++++++++-------- .../internal/ParameterProviderTest.java | 18 ++++++- 2 files changed, 47 insertions(+), 22 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java index 42dbd8532..bd769f6cc 100644 --- a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java +++ b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java @@ -83,6 +83,9 @@ public class ParameterProvider { /** Map of parameter name to parameter value. This will be set by client/configure API Call. */ private final Map parameterMap = new HashMap<>(); + /* Iceberg mode flag */ + private final boolean isIcebergMode; + // Cached buffer flush interval - avoid parsing each time for quick lookup private Long cachedBufferFlushIntervalMs = -1L; @@ -97,6 +100,7 @@ public class ParameterProvider { */ public ParameterProvider( Map parameterOverrides, Properties props, boolean isIcebergMode) { + this.isIcebergMode = isIcebergMode; this.setParameterMap(parameterOverrides, props, isIcebergMode); } @@ -111,26 +115,19 @@ private void checkAndUpdate( Map parameterOverrides, Properties props, boolean enforceDefault) { - if (parameterOverrides != null && props != null) { - this.parameterMap.put( - key, parameterOverrides.getOrDefault(key, props.getOrDefault(key, defaultValue))); - } else if (parameterOverrides != null) { - this.parameterMap.put(key, parameterOverrides.getOrDefault(key, defaultValue)); - } else if (props != null) { - this.parameterMap.put(key, props.getOrDefault(key, defaultValue)); - } else { - this.parameterMap.put(key, defaultValue); + if (props != null && props.containsKey(key)) { + this.parameterMap.put(key, props.get(key)); + } + if (parameterOverrides != null && parameterOverrides.containsKey(key)) { + this.parameterMap.put(key, parameterOverrides.get(key)); } - if (enforceDefault) { - if (!this.parameterMap.getOrDefault(key, defaultValue).equals(defaultValue)) { - throw new SFException( - INVALID_CONFIG_PARAMETER, - String.format( - "The value %s for %s is not configurable, should be %s.", - this.parameterMap.get(key), key, defaultValue)); - } - this.parameterMap.put(key, defaultValue); + if (enforceDefault && !this.parameterMap.getOrDefault(key, defaultValue).equals(defaultValue)) { + throw new SFException( + INVALID_CONFIG_PARAMETER, + String.format( + "The value %s for %s is not configurable, should be %s.", + this.parameterMap.get(key), key, defaultValue)); } } @@ -254,6 +251,14 @@ private void setParameterMap( parameterOverrides, props, false); + + if (getMaxChunksInBlob() > getMaxChunksInRegistrationRequest()) { + throw new IllegalArgumentException( + String.format( + "max_chunks_in_blobs (%s) should be less than or equal to" + + " make_chunks_in_registration_request (%s)", + getMaxChunksInBlob(), getMaxChunksInRegistrationRequest())); + } } /** @return Longest interval in milliseconds between buffer flushes */ @@ -267,7 +272,10 @@ public long getCachedMaxClientLagInMs() { } private long getMaxClientLagInMs() { - Object val = this.parameterMap.getOrDefault(MAX_CLIENT_LAG, MAX_CLIENT_LAG_DEFAULT); + Object val = + this.parameterMap.getOrDefault( + MAX_CLIENT_LAG, + isIcebergMode ? MAX_CLIENT_LAG_ICEBERG_MODE_DEFAULT : MAX_CLIENT_LAG_DEFAULT); long computedLag; if (val instanceof String) { String maxLag = (String) val; @@ -452,7 +460,10 @@ public long getMaxAllowedRowSizeInBytes() { /** @return The max number of chunks that can be put into a single BDEC. */ public int getMaxChunksInBlob() { - Object val = this.parameterMap.getOrDefault(MAX_CHUNKS_IN_BLOB, MAX_CHUNKS_IN_BLOB_DEFAULT); + Object val = + this.parameterMap.getOrDefault( + MAX_CHUNKS_IN_BLOB, + isIcebergMode ? MAX_CHUNKS_IN_BLOB_ICEBERG_MODE_DEFAULT : MAX_CHUNKS_IN_BLOB_DEFAULT); return (val instanceof String) ? Integer.parseInt(val.toString()) : (int) val; } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java index cd42e8faa..a0478bd8a 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java @@ -340,9 +340,23 @@ public void testMaxChunksInBlob() { public void testMaxChunksInRegistrationRequest() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); - parameterMap.put("max_chunks_in_registration_request", 1); + parameterMap.put("max_chunks_in_registration_request", 101); ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); - Assert.assertEquals(1, parameterProvider.getMaxChunksInRegistrationRequest()); + Assert.assertEquals(101, parameterProvider.getMaxChunksInRegistrationRequest()); + + IllegalArgumentException e = + Assert.assertThrows( + IllegalArgumentException.class, + () -> { + parameterMap.put("max_chunks_in_registration_request", 0); + new ParameterProvider(parameterMap, prop, isIcebergMode); + }); + Assert.assertEquals( + e.getMessage(), + String.format( + "max_chunks_in_blobs (%s) should be less than or equal to" + + " make_chunks_in_registration_request (%s)", + parameterProvider.getMaxChunksInBlob(), 0)); } @Test From 4ea179b8b7cd41cb613806eb4593e16c43201ca6 Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Fri, 9 Aug 2024 12:47:34 -0700 Subject: [PATCH 6/7] remove builder setter --- .../streaming/SnowflakeStreamingIngestClientFactory.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java b/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java index 6b0f057a7..20c12f019 100644 --- a/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java +++ b/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java @@ -54,11 +54,6 @@ public Builder setIsTestMode(boolean isTestMode) { return this; } - Builder setIsIcebergMode(boolean isIcebergMode) { - this.isIcebergMode = isIcebergMode; - return this; - } - public SnowflakeStreamingIngestClient build() { Utils.assertStringNotNullOrEmpty("client name", this.name); Utils.assertNotNull("connection properties", this.prop); From a187056c97c9d5cc3edc3cd9d22b30e38a46ddad Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Fri, 9 Aug 2024 14:48:42 -0700 Subject: [PATCH 7/7] remove change in client factory --- .../SnowflakeStreamingIngestClientFactory.java | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java b/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java index 20c12f019..6f0b7b764 100644 --- a/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java +++ b/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java @@ -31,10 +31,6 @@ public static class Builder { // Indicates whether it's under test mode private boolean isTestMode; - // Indicates whether it's streaming to Iceberg tables. Open channels on regular tables should - // fail in this mode. - private boolean isIcebergMode; - private Builder(String name) { this.name = name; } @@ -62,12 +58,7 @@ public SnowflakeStreamingIngestClient build() { SnowflakeURL accountURL = new SnowflakeURL(prop.getProperty(Constants.ACCOUNT_URL)); return new SnowflakeStreamingIngestClientInternal<>( - this.name, - accountURL, - prop, - this.parameterOverrides, - this.isIcebergMode, - this.isTestMode); + this.name, accountURL, prop, this.parameterOverrides, false, this.isTestMode); } } }