diff --git a/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java b/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java index cd6d78787..6f0b7b764 100644 --- a/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java +++ b/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.streaming; @@ -58,7 +58,7 @@ public SnowflakeStreamingIngestClient build() { SnowflakeURL accountURL = new SnowflakeURL(prop.getProperty(Constants.ACCOUNT_URL)); return new SnowflakeStreamingIngestClientInternal<>( - this.name, accountURL, prop, this.parameterOverrides, this.isTestMode); + this.name, accountURL, prop, this.parameterOverrides, false, this.isTestMode); } } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java b/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java index b88090e01..8e65ed677 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.streaming.internal; @@ -61,10 +61,14 @@ class BlobBuilder { * @param blobData All the data for one blob. Assumes that all ChannelData in the inner List * belongs to the same table. Will error if this is not the case * @param bdecVersion version of blob + * @param encrypt If the output chunk is encrypted or not * @return {@link Blob} data */ static Blob constructBlobAndMetadata( - String filePath, List>> blobData, Constants.BdecVersion bdecVersion) + String filePath, + List>> blobData, + Constants.BdecVersion bdecVersion, + boolean encrypt) throws IOException, NoSuchPaddingException, NoSuchAlgorithmException, InvalidAlgorithmParameterException, InvalidKeyException, IllegalBlockSizeException, BadPaddingException { @@ -83,25 +87,34 @@ static Blob constructBlobAndMetadata( flusher.serialize(channelsDataPerTable, filePath); if (!serializedChunk.channelsMetadataList.isEmpty()) { - ByteArrayOutputStream chunkData = serializedChunk.chunkData; - Pair paddedChunk = - padChunk(chunkData, Constants.ENCRYPTION_ALGORITHM_BLOCK_SIZE_BYTES); - byte[] paddedChunkData = paddedChunk.getFirst(); - int paddedChunkLength = paddedChunk.getSecond(); + final byte[] compressedChunkData; + final int chunkLength; + final int compressedChunkDataSize; - // Encrypt the compressed chunk data, the encryption key is derived using the key from - // server with the full blob path. - // We need to maintain IV as a block counter for the whole file, even interleaved, - // to align with decryption on the Snowflake query path. - // TODO: address alignment for the header SNOW-557866 - long iv = curDataSize / Constants.ENCRYPTION_ALGORITHM_BLOCK_SIZE_BYTES; - byte[] encryptedCompressedChunkData = - Cryptor.encrypt( - paddedChunkData, firstChannelFlushContext.getEncryptionKey(), filePath, iv); + if (encrypt) { + Pair paddedChunk = + padChunk(serializedChunk.chunkData, Constants.ENCRYPTION_ALGORITHM_BLOCK_SIZE_BYTES); + byte[] paddedChunkData = paddedChunk.getFirst(); + chunkLength = paddedChunk.getSecond(); + + // Encrypt the compressed chunk data, the encryption key is derived using the key from + // server with the full blob path. + // We need to maintain IV as a block counter for the whole file, even interleaved, + // to align with decryption on the Snowflake query path. + // TODO: address alignment for the header SNOW-557866 + long iv = curDataSize / Constants.ENCRYPTION_ALGORITHM_BLOCK_SIZE_BYTES; + compressedChunkData = + Cryptor.encrypt( + paddedChunkData, firstChannelFlushContext.getEncryptionKey(), filePath, iv); + compressedChunkDataSize = compressedChunkData.length; + } else { + compressedChunkData = serializedChunk.chunkData.toByteArray(); + chunkLength = compressedChunkData.length; + compressedChunkDataSize = chunkLength; + } // Compute the md5 of the chunk data - String md5 = computeMD5(encryptedCompressedChunkData, paddedChunkLength); - int encryptedCompressedChunkDataSize = encryptedCompressedChunkData.length; + String md5 = computeMD5(compressedChunkData, chunkLength); // Create chunk metadata long startOffset = curDataSize; @@ -111,9 +124,9 @@ static Blob constructBlobAndMetadata( // The start offset will be updated later in BlobBuilder#build to include the blob // header .setChunkStartOffset(startOffset) - // The paddedChunkLength is used because it is the actual data size used for + // The chunkLength is used because it is the actual data size used for // decompression and md5 calculation on server side. - .setChunkLength(paddedChunkLength) + .setChunkLength(chunkLength) .setUncompressedChunkLength((int) serializedChunk.chunkEstimatedUncompressedSize) .setChannelList(serializedChunk.channelsMetadataList) .setChunkMD5(md5) @@ -127,21 +140,22 @@ static Blob constructBlobAndMetadata( // Add chunk metadata and data to the list chunksMetadataList.add(chunkMetadata); - chunksDataList.add(encryptedCompressedChunkData); - curDataSize += encryptedCompressedChunkDataSize; - crc.update(encryptedCompressedChunkData, 0, encryptedCompressedChunkDataSize); + chunksDataList.add(compressedChunkData); + curDataSize += compressedChunkDataSize; + crc.update(compressedChunkData, 0, compressedChunkDataSize); logger.logInfo( "Finish building chunk in blob={}, table={}, rowCount={}, startOffset={}," - + " estimatedUncompressedSize={}, paddedChunkLength={}, encryptedCompressedSize={}," - + " bdecVersion={}", + + " estimatedUncompressedSize={}, chunkLength={}, compressedSize={}," + + " encryption={}, bdecVersion={}", filePath, firstChannelFlushContext.getFullyQualifiedTableName(), serializedChunk.rowCount, startOffset, serializedChunk.chunkEstimatedUncompressedSize, - paddedChunkLength, - encryptedCompressedChunkDataSize, + chunkLength, + compressedChunkDataSize, + encrypt, bdecVersion); } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java index 84e1a2561..6f5296209 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java @@ -192,12 +192,12 @@ private CompletableFuture distributeFlush( /** If tracing is enabled, print always else, check if it needs flush or is forceful. */ private void logFlushTask(boolean isForce, Set tablesToFlush, long flushStartTime) { boolean isNeedFlush = - this.owningClient.getParameterProvider().getMaxChunksInBlobAndRegistrationRequest() == 1 + this.owningClient.getParameterProvider().getMaxChunksInBlob() == 1 ? tablesToFlush.stream().anyMatch(channelCache::getNeedFlush) : this.isNeedFlush; long currentTime = System.currentTimeMillis(); final String logInfo; - if (this.owningClient.getParameterProvider().getMaxChunksInBlobAndRegistrationRequest() == 1) { + if (this.owningClient.getParameterProvider().getMaxChunksInBlob() == 1) { logInfo = String.format( "Tables=[%s]", @@ -272,7 +272,7 @@ CompletableFuture flush(boolean isForce) { this.owningClient.getParameterProvider().getCachedMaxClientLagInMs(); final Set tablesToFlush; - if (this.owningClient.getParameterProvider().getMaxChunksInBlobAndRegistrationRequest() == 1) { + if (this.owningClient.getParameterProvider().getMaxChunksInBlob() == 1) { tablesToFlush = this.channelCache.keySet().stream() .filter( @@ -412,15 +412,13 @@ void distributeFlushTasks(Set tablesToFlush) { channelsDataPerTable.addAll(leftoverChannelsDataPerTable); leftoverChannelsDataPerTable.clear(); } else if (blobData.size() - >= this.owningClient - .getParameterProvider() - .getMaxChunksInBlobAndRegistrationRequest()) { + >= this.owningClient.getParameterProvider().getMaxChunksInBlob()) { // Create a new blob if the current one already contains max allowed number of chunks logger.logInfo( "Max allowed number of chunks in the current blob reached. chunkCount={}" + " maxChunkCount={} currentBlobPath={}", blobData.size(), - this.owningClient.getParameterProvider().getMaxChunksInBlobAndRegistrationRequest(), + this.owningClient.getParameterProvider().getMaxChunksInBlob(), blobPath); break; } else { @@ -599,7 +597,12 @@ BlobMetadata buildAndUpload( Timer.Context buildContext = Utils.createTimerContext(this.owningClient.buildLatency); // Construct the blob along with the metadata of the blob - BlobBuilder.Blob blob = BlobBuilder.constructBlobAndMetadata(blobPath, blobData, bdecVersion); + BlobBuilder.Blob blob = + BlobBuilder.constructBlobAndMetadata( + blobPath, + blobData, + bdecVersion, + this.owningClient.getInternalParameterProvider().getEnableChunkEncryption()); blob.blobStats.setBuildDurationMs(buildContext); @@ -691,7 +694,7 @@ void shutdown() throws InterruptedException { */ void setNeedFlush(String fullyQualifiedTableName) { this.isNeedFlush = true; - if (this.owningClient.getParameterProvider().getMaxChunksInBlobAndRegistrationRequest() == 1) { + if (this.owningClient.getParameterProvider().getMaxChunksInBlob() == 1) { this.channelCache.setNeedFlush(fullyQualifiedTableName, true); } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/InternalParameterProvider.java b/src/main/java/net/snowflake/ingest/streaming/internal/InternalParameterProvider.java new file mode 100644 index 000000000..46a99b671 --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/InternalParameterProvider.java @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal; + +/** A class to provide non-configurable constants depends on Iceberg or non-Iceberg mode */ +class InternalParameterProvider { + private final boolean isIcebergMode; + + InternalParameterProvider(boolean isIcebergMode) { + this.isIcebergMode = isIcebergMode; + } + + boolean getEnableChunkEncryption() { + // When in Iceberg mode, chunk encryption is disabled. Otherwise, it is enabled. Since Iceberg + // mode does not need client-side encryption. + return !isIcebergMode; + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index 080b4f87d..cec36cfbf 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -96,6 +96,9 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea // Snowflake role for the client to use private String role; + // Provides constant values which is determined by the Iceberg or non-Iceberg mode + private final InternalParameterProvider internalParameterProvider; + // Http client to send HTTP requests to Snowflake private final CloseableHttpClient httpClient; @@ -111,6 +114,9 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea // Indicates whether the client has closed private volatile boolean isClosed; + // Indicates wheter the client is streaming to Iceberg tables + private final boolean isIcebergMode; + // Indicates whether the client is under test mode private final boolean isTestMode; @@ -146,6 +152,7 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea * @param prop connection properties * @param httpClient http client for sending request * @param isTestMode whether we're under test mode + * @param isIcebergMode whether we're streaming to Iceberg tables * @param requestBuilder http request builder * @param parameterOverrides parameters we override in case we want to set different values */ @@ -154,13 +161,16 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea SnowflakeURL accountURL, Properties prop, CloseableHttpClient httpClient, + boolean isIcebergMode, boolean isTestMode, RequestBuilder requestBuilder, Map parameterOverrides) { - this.parameterProvider = new ParameterProvider(parameterOverrides, prop); + this.parameterProvider = new ParameterProvider(parameterOverrides, prop, isIcebergMode); + this.internalParameterProvider = new InternalParameterProvider(isIcebergMode); this.name = name; String accountName = accountURL == null ? null : accountURL.getAccount(); + this.isIcebergMode = isIcebergMode; this.isTestMode = isTestMode; this.httpClient = httpClient == null ? HttpUtil.getHttpClient(accountName) : httpClient; this.channelCache = new ChannelCache<>(); @@ -250,6 +260,7 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea * @param accountURL Snowflake account url * @param prop connection properties * @param parameterOverrides map of parameters to override for this client + * @param isIcebergMode whether we're streaming to Iceberg tables * @param isTestMode indicates whether it's under test mode */ public SnowflakeStreamingIngestClientInternal( @@ -257,16 +268,17 @@ public SnowflakeStreamingIngestClientInternal( SnowflakeURL accountURL, Properties prop, Map parameterOverrides, + boolean isIcebergMode, boolean isTestMode) { - this(name, accountURL, prop, null, isTestMode, null, parameterOverrides); + this(name, accountURL, prop, null, isIcebergMode, isTestMode, null, parameterOverrides); } /*** Constructor for TEST ONLY * * @param name the name of the client */ - SnowflakeStreamingIngestClientInternal(String name) { - this(name, null, null, null, true, null, new HashMap<>()); + SnowflakeStreamingIngestClientInternal(String name, boolean isIcebergMode) { + this(name, null, null, null, isIcebergMode, true, null, new HashMap<>()); } // TESTING ONLY - inject the request builder @@ -495,21 +507,20 @@ List> partitionBlobListForRegistrationRequest(List> result = new ArrayList<>(); List currentBatch = new ArrayList<>(); int chunksInCurrentBatch = 0; - int maxChunksInBlobAndRegistrationRequest = - parameterProvider.getMaxChunksInBlobAndRegistrationRequest(); + int maxChunksInRegistrationRequest = parameterProvider.getMaxChunksInRegistrationRequest(); for (BlobMetadata blob : blobs) { - if (blob.getChunks().size() > maxChunksInBlobAndRegistrationRequest) { + if (blob.getChunks().size() > maxChunksInRegistrationRequest) { throw new SFException( ErrorCode.INTERNAL_ERROR, String.format( "Incorrectly generated blob detected - number of chunks in the blob is larger than" + " the max allowed number of chunks. Please report this bug to Snowflake." + " bdec=%s chunkCount=%d maxAllowedChunkCount=%d", - blob.getPath(), blob.getChunks().size(), maxChunksInBlobAndRegistrationRequest)); + blob.getPath(), blob.getChunks().size(), maxChunksInRegistrationRequest)); } - if (chunksInCurrentBatch + blob.getChunks().size() > maxChunksInBlobAndRegistrationRequest) { + if (chunksInCurrentBatch + blob.getChunks().size() > maxChunksInRegistrationRequest) { // Newly added BDEC file would exceed the max number of chunks in a single registration // request. We put chunks collected so far into the result list and create a new batch with // the current blob @@ -875,6 +886,15 @@ ParameterProvider getParameterProvider() { return parameterProvider; } + /** + * Get InternalParameterProvider with internal parameters + * + * @return {@link InternalParameterProvider} used by the client + */ + InternalParameterProvider getInternalParameterProvider() { + return internalParameterProvider; + } + /** * Set refresh token, this method is for refresh token renewal without requiring to restart * client. This method only works when the authorization type is OAuth diff --git a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java index 9d9db5356..bd769f6cc 100644 --- a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java +++ b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java @@ -1,5 +1,11 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.utils; +import static net.snowflake.ingest.utils.ErrorCode.INVALID_CONFIG_PARAMETER; + import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -31,8 +37,9 @@ public class ParameterProvider { public static final String MAX_CHUNK_SIZE_IN_BYTES = "MAX_CHUNK_SIZE_IN_BYTES".toLowerCase(); public static final String MAX_ALLOWED_ROW_SIZE_IN_BYTES = "MAX_ALLOWED_ROW_SIZE_IN_BYTES".toLowerCase(); - public static final String MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST = - "MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST".toLowerCase(); + public static final String MAX_CHUNKS_IN_BLOB = "MAX_CHUNKS_IN_BLOB".toLowerCase(); + public static final String MAX_CHUNKS_IN_REGISTRATION_REQUEST = + "MAX_CHUNKS_IN_REGISTRATION_REQUEST".toLowerCase(); public static final String MAX_CLIENT_LAG = "MAX_CLIENT_LAG".toLowerCase(); @@ -55,15 +62,20 @@ public class ParameterProvider { // Lag related parameters public static final long MAX_CLIENT_LAG_DEFAULT = 1000; // 1 second + public static final long MAX_CLIENT_LAG_ICEBERG_MODE_DEFAULT = 30000; // 30 seconds static final long MAX_CLIENT_LAG_MS_MIN = TimeUnit.SECONDS.toMillis(1); static final long MAX_CLIENT_LAG_MS_MAX = TimeUnit.MINUTES.toMillis(10); public static final long MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT = 64 * 1024 * 1024; // 64 MB - public static final int MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT = 100; + public static final int MAX_CHUNKS_IN_BLOB_DEFAULT = 100; + public static final int MAX_CHUNKS_IN_REGISTRATION_REQUEST_DEFAULT = 100; public static final Constants.BdecParquetCompression BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT = Constants.BdecParquetCompression.GZIP; + /* Iceberg mode parameters: When streaming to Iceberg mode, different default parameters are required because it generates Parquet files instead of BDEC files. */ + public static final int MAX_CHUNKS_IN_BLOB_ICEBERG_MODE_DEFAULT = 1; + /* Parameter that enables using internal Parquet buffers for buffering of rows before serializing. It reduces memory consumption compared to using Java Objects for buffering.*/ public static final boolean ENABLE_PARQUET_INTERNAL_BUFFERING_DEFAULT = false; @@ -71,6 +83,9 @@ public class ParameterProvider { /** Map of parameter name to parameter value. This will be set by client/configure API Call. */ private final Map parameterMap = new HashMap<>(); + /* Iceberg mode flag */ + private final boolean isIcebergMode; + // Cached buffer flush interval - avoid parsing each time for quick lookup private Long cachedBufferFlushIntervalMs = -1L; @@ -80,25 +95,39 @@ public class ParameterProvider { * * @param parameterOverrides Map of parameter name to value * @param props Properties from profile file + * @param isIcebergMode If the provided parameters need to be verified and modified to meet + * Iceberg mode */ - public ParameterProvider(Map parameterOverrides, Properties props) { - this.setParameterMap(parameterOverrides, props); + public ParameterProvider( + Map parameterOverrides, Properties props, boolean isIcebergMode) { + this.isIcebergMode = isIcebergMode; + this.setParameterMap(parameterOverrides, props, isIcebergMode); } /** Empty constructor for tests */ - public ParameterProvider() { - this(null, null); + public ParameterProvider(boolean isIcebergMode) { + this(null, null, isIcebergMode); } - private void updateValue( - String key, Object defaultValue, Map parameterOverrides, Properties props) { - if (parameterOverrides != null && props != null) { - this.parameterMap.put( - key, parameterOverrides.getOrDefault(key, props.getOrDefault(key, defaultValue))); - } else if (parameterOverrides != null) { - this.parameterMap.put(key, parameterOverrides.getOrDefault(key, defaultValue)); - } else if (props != null) { - this.parameterMap.put(key, props.getOrDefault(key, defaultValue)); + private void checkAndUpdate( + String key, + Object defaultValue, + Map parameterOverrides, + Properties props, + boolean enforceDefault) { + if (props != null && props.containsKey(key)) { + this.parameterMap.put(key, props.get(key)); + } + if (parameterOverrides != null && parameterOverrides.containsKey(key)) { + this.parameterMap.put(key, parameterOverrides.get(key)); + } + + if (enforceDefault && !this.parameterMap.getOrDefault(key, defaultValue).equals(defaultValue)) { + throw new SFException( + INVALID_CONFIG_PARAMETER, + String.format( + "The value %s for %s is not configurable, should be %s.", + this.parameterMap.get(key), key, defaultValue)); } } @@ -107,8 +136,11 @@ private void updateValue( * * @param parameterOverrides Map of parameter name -> value * @param props Properties file provided to client constructor + * @param isIcebergMode If the provided parameters need to be verified and modified to meet + * Iceberg mode */ - private void setParameterMap(Map parameterOverrides, Properties props) { + private void setParameterMap( + Map parameterOverrides, Properties props, boolean isIcebergMode) { // BUFFER_FLUSH_INTERVAL_IN_MILLIS is deprecated and disallowed if ((parameterOverrides != null && parameterOverrides.containsKey(BUFFER_FLUSH_INTERVAL_IN_MILLIS)) @@ -119,75 +151,114 @@ private void setParameterMap(Map parameterOverrides, Properties BUFFER_FLUSH_INTERVAL_IN_MILLIS, MAX_CLIENT_LAG)); } - this.updateValue( + this.checkAndUpdate( BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS, BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS_DEFAULT, parameterOverrides, - props); + props, + false); - this.updateValue( + this.checkAndUpdate( INSERT_THROTTLE_INTERVAL_IN_MILLIS, INSERT_THROTTLE_INTERVAL_IN_MILLIS_DEFAULT, parameterOverrides, - props); + props, + false); - this.updateValue( + this.checkAndUpdate( INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE, INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE_DEFAULT, parameterOverrides, - props); + props, + false); - this.updateValue( + this.checkAndUpdate( INSERT_THROTTLE_THRESHOLD_IN_BYTES, INSERT_THROTTLE_THRESHOLD_IN_BYTES_DEFAULT, parameterOverrides, - props); + props, + false); - this.updateValue( + this.checkAndUpdate( ENABLE_SNOWPIPE_STREAMING_METRICS, SNOWPIPE_STREAMING_METRICS_DEFAULT, parameterOverrides, - props); + props, + false); - this.updateValue(BLOB_FORMAT_VERSION, BLOB_FORMAT_VERSION_DEFAULT, parameterOverrides, props); + this.checkAndUpdate( + BLOB_FORMAT_VERSION, BLOB_FORMAT_VERSION_DEFAULT, parameterOverrides, props, false); getBlobFormatVersion(); // to verify parsing the configured value - this.updateValue(IO_TIME_CPU_RATIO, IO_TIME_CPU_RATIO_DEFAULT, parameterOverrides, props); + this.checkAndUpdate( + IO_TIME_CPU_RATIO, IO_TIME_CPU_RATIO_DEFAULT, parameterOverrides, props, false); - this.updateValue( + this.checkAndUpdate( BLOB_UPLOAD_MAX_RETRY_COUNT, BLOB_UPLOAD_MAX_RETRY_COUNT_DEFAULT, parameterOverrides, - props); + props, + false); - this.updateValue( - MAX_MEMORY_LIMIT_IN_BYTES, MAX_MEMORY_LIMIT_IN_BYTES_DEFAULT, parameterOverrides, props); + this.checkAndUpdate( + MAX_MEMORY_LIMIT_IN_BYTES, + MAX_MEMORY_LIMIT_IN_BYTES_DEFAULT, + parameterOverrides, + props, + false); - this.updateValue( + this.checkAndUpdate( ENABLE_PARQUET_INTERNAL_BUFFERING, ENABLE_PARQUET_INTERNAL_BUFFERING_DEFAULT, parameterOverrides, - props); + props, + false); + + this.checkAndUpdate( + MAX_CHANNEL_SIZE_IN_BYTES, + MAX_CHANNEL_SIZE_IN_BYTES_DEFAULT, + parameterOverrides, + props, + false); - this.updateValue( - MAX_CHANNEL_SIZE_IN_BYTES, MAX_CHANNEL_SIZE_IN_BYTES_DEFAULT, parameterOverrides, props); + this.checkAndUpdate( + MAX_CHUNK_SIZE_IN_BYTES, MAX_CHUNK_SIZE_IN_BYTES_DEFAULT, parameterOverrides, props, false); - this.updateValue( - MAX_CHUNK_SIZE_IN_BYTES, MAX_CHUNK_SIZE_IN_BYTES_DEFAULT, parameterOverrides, props); + this.checkAndUpdate( + MAX_CLIENT_LAG, + isIcebergMode ? MAX_CLIENT_LAG_ICEBERG_MODE_DEFAULT : MAX_CLIENT_LAG_DEFAULT, + parameterOverrides, + props, + false); - this.updateValue(MAX_CLIENT_LAG, MAX_CLIENT_LAG_DEFAULT, parameterOverrides, props); + this.checkAndUpdate( + MAX_CHUNKS_IN_BLOB, + isIcebergMode ? MAX_CHUNKS_IN_BLOB_ICEBERG_MODE_DEFAULT : MAX_CHUNKS_IN_BLOB_DEFAULT, + parameterOverrides, + props, + isIcebergMode); - this.updateValue( - MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST, - MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT, + this.checkAndUpdate( + MAX_CHUNKS_IN_REGISTRATION_REQUEST, + MAX_CHUNKS_IN_REGISTRATION_REQUEST_DEFAULT, parameterOverrides, - props); + props, + false); - this.updateValue( + this.checkAndUpdate( BDEC_PARQUET_COMPRESSION_ALGORITHM, BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT, parameterOverrides, - props); + props, + false); + + if (getMaxChunksInBlob() > getMaxChunksInRegistrationRequest()) { + throw new IllegalArgumentException( + String.format( + "max_chunks_in_blobs (%s) should be less than or equal to" + + " make_chunks_in_registration_request (%s)", + getMaxChunksInBlob(), getMaxChunksInRegistrationRequest())); + } } /** @return Longest interval in milliseconds between buffer flushes */ @@ -201,7 +272,10 @@ public long getCachedMaxClientLagInMs() { } private long getMaxClientLagInMs() { - Object val = this.parameterMap.getOrDefault(MAX_CLIENT_LAG, MAX_CLIENT_LAG_DEFAULT); + Object val = + this.parameterMap.getOrDefault( + MAX_CLIENT_LAG, + isIcebergMode ? MAX_CLIENT_LAG_ICEBERG_MODE_DEFAULT : MAX_CLIENT_LAG_DEFAULT); long computedLag; if (val instanceof String) { String maxLag = (String) val; @@ -384,15 +458,20 @@ public long getMaxAllowedRowSizeInBytes() { return (val instanceof String) ? Long.parseLong(val.toString()) : (long) val; } - /** - * @return The max number of chunks that can be put into a single BDEC or blob registration - * request. - */ - public int getMaxChunksInBlobAndRegistrationRequest() { + /** @return The max number of chunks that can be put into a single BDEC. */ + public int getMaxChunksInBlob() { + Object val = + this.parameterMap.getOrDefault( + MAX_CHUNKS_IN_BLOB, + isIcebergMode ? MAX_CHUNKS_IN_BLOB_ICEBERG_MODE_DEFAULT : MAX_CHUNKS_IN_BLOB_DEFAULT); + return (val instanceof String) ? Integer.parseInt(val.toString()) : (int) val; + } + + /** @return The max number of chunks that can be put into a single blob registration request. */ + public int getMaxChunksInRegistrationRequest() { Object val = this.parameterMap.getOrDefault( - MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST, - MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT); + MAX_CHUNKS_IN_REGISTRATION_REQUEST, MAX_CHUNKS_IN_REGISTRATION_REQUEST_DEFAULT); return (val instanceof String) ? Integer.parseInt(val.toString()) : (int) val; } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java index e220aec79..17d90f458 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal; import java.io.ByteArrayOutputStream; @@ -13,9 +17,18 @@ import org.apache.parquet.schema.MessageType; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.mockito.Mockito; +@RunWith(Parameterized.class) public class BlobBuilderTest { + @Parameterized.Parameters(name = "encrypt: {0}") + public static Object[] encrypt() { + return new Object[] {false, true}; + } + + @Parameterized.Parameter public boolean encrypt; @Test public void testSerializationErrors() throws Exception { @@ -23,18 +36,21 @@ public void testSerializationErrors() throws Exception { BlobBuilder.constructBlobAndMetadata( "a.bdec", Collections.singletonList(createChannelDataPerTable(1, false)), - Constants.BdecVersion.THREE); + Constants.BdecVersion.THREE, + encrypt); BlobBuilder.constructBlobAndMetadata( "a.bdec", Collections.singletonList(createChannelDataPerTable(1, true)), - Constants.BdecVersion.THREE); + Constants.BdecVersion.THREE, + encrypt); // Construction fails if metadata contains 0 rows and data 1 row try { BlobBuilder.constructBlobAndMetadata( "a.bdec", Collections.singletonList(createChannelDataPerTable(0, false)), - Constants.BdecVersion.THREE); + Constants.BdecVersion.THREE, + encrypt); Assert.fail("Should not pass enableParquetInternalBuffering=false"); } catch (SFException e) { Assert.assertEquals(ErrorCode.INTERNAL_ERROR.getMessageCode(), e.getVendorCode()); @@ -52,7 +68,8 @@ public void testSerializationErrors() throws Exception { BlobBuilder.constructBlobAndMetadata( "a.bdec", Collections.singletonList(createChannelDataPerTable(0, true)), - Constants.BdecVersion.THREE); + Constants.BdecVersion.THREE, + encrypt); Assert.fail("Should not pass enableParquetInternalBuffering=true"); } catch (SFException e) { Assert.assertEquals(ErrorCode.INTERNAL_ERROR.getMessageCode(), e.getVendorCode()); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java index db1d737ba..72e4f8c2f 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java @@ -13,8 +13,19 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +@RunWith(Parameterized.class) public class ChannelCacheTest { + + @Parameterized.Parameters(name = "isIcebergMode: {0}") + public static Object[] isIcebergMode() { + return new Object[] {false, true}; + } + + @Parameterized.Parameter public static boolean isIcebergMode; + ChannelCache cache; SnowflakeStreamingIngestClientInternal client; SnowflakeStreamingIngestChannelInternal channel1; @@ -28,7 +39,7 @@ public class ChannelCacheTest { @Before public void setup() { cache = new ChannelCache<>(); - client = new SnowflakeStreamingIngestClientInternal<>("client"); + client = new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); channel1 = new SnowflakeStreamingIngestChannelInternal<>( "channel1", diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index 5ab500f9e..76cf7551d 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal; import static net.snowflake.ingest.utils.Constants.BLOB_CHECKSUM_SIZE_IN_BYTES; @@ -50,12 +54,23 @@ import net.snowflake.ingest.utils.SFException; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatchers; import org.mockito.Mockito; import org.mockito.stubbing.Answer; +@RunWith(Parameterized.class) public class FlushServiceTest { + + @Parameterized.Parameters(name = "isIcebergMode: {0}") + public static Object[] isIcebergMode() { + return new Object[] {false, true}; + } + + @Parameterized.Parameter public static boolean isIcebergMode; + public FlushServiceTest() { this.testContextFactory = ParquetTestContext.createFactory(); } @@ -89,9 +104,12 @@ private abstract static class TestContext implements AutoCloseable { TestContext() { storage = Mockito.mock(StreamingIngestStorage.class); - parameterProvider = new ParameterProvider(); + parameterProvider = new ParameterProvider(isIcebergMode); + InternalParameterProvider internalParameterProvider = + new InternalParameterProvider(isIcebergMode); client = Mockito.mock(SnowflakeStreamingIngestClientInternal.class); Mockito.when(client.getParameterProvider()).thenReturn(parameterProvider); + Mockito.when(client.getInternalParameterProvider()).thenReturn(internalParameterProvider); storageManager = Mockito.spy(new InternalStageManager<>(true, "role", "client", null)); Mockito.doReturn(storage).when(storageManager).getStorage(ArgumentMatchers.any()); Mockito.when(storageManager.getClientPrefix()).thenReturn("client_prefix"); @@ -104,7 +122,7 @@ private abstract static class TestContext implements AutoCloseable { } void setParameterOverride(Map parameterOverride) { - this.parameterProvider = new ParameterProvider(parameterOverride, null); + this.parameterProvider = new ParameterProvider(parameterOverride, null, isIcebergMode); } ChannelData flushChannel(String name) { @@ -437,10 +455,17 @@ public void testGetFilePath() { } @Test - public void testFlush() throws Exception { + public void testInterleaveFlush() throws Exception { + if (isIcebergMode) { + // Interleaved blob is not supported in iceberg mode + return; + } int numChannels = 4; Long maxLastFlushTime = Long.MAX_VALUE - 1000L; // -1000L to avoid jitter overflow TestContext>> testContext = testContextFactory.create(); + testContext.setParameterOverride( + Collections.singletonMap( + ParameterProvider.MAX_CHUNKS_IN_BLOB, ParameterProvider.MAX_CHUNKS_IN_BLOB_DEFAULT)); addChannel1(testContext); FlushService flushService = testContext.flushService; ChannelCache channelCache = testContext.channelCache; @@ -505,7 +530,7 @@ public void testNonInterleaveFlush() throws ExecutionException, InterruptedExcep ChannelCache channelCache = testContext.channelCache; Mockito.when(flushService.isTestMode()).thenReturn(false); testContext.setParameterOverride( - Collections.singletonMap(ParameterProvider.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST, 1)); + Collections.singletonMap(ParameterProvider.MAX_CHUNKS_IN_BLOB, 1)); // Test need flush IntStream.range(0, numChannels) @@ -700,7 +725,9 @@ public void runTestBlobSplitDueToNumberOfChunks(int numberOfRows) throws Excepti Math.ceil( (double) numberOfRows / channelsPerTable - / ParameterProvider.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT); + / (isIcebergMode + ? ParameterProvider.MAX_CHUNKS_IN_BLOB_ICEBERG_MODE_DEFAULT + : ParameterProvider.MAX_CHUNKS_IN_BLOB_DEFAULT)); final TestContext>> testContext = testContextFactory.create(); @@ -978,7 +1005,7 @@ public void testInvalidateChannels() { // Create a new Client in order to not interfere with other tests SnowflakeStreamingIngestClientInternal client = Mockito.mock(SnowflakeStreamingIngestClientInternal.class); - ParameterProvider parameterProvider = new ParameterProvider(); + ParameterProvider parameterProvider = new ParameterProvider(isIcebergMode); ChannelCache channelCache = new ChannelCache<>(); Mockito.when(client.getChannelCache()).thenReturn(channelCache); Mockito.when(client.getParameterProvider()).thenReturn(parameterProvider); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/InsertRowsBenchmarkTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/InsertRowsBenchmarkTest.java index 5b28e9c45..37e41df5a 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/InsertRowsBenchmarkTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/InsertRowsBenchmarkTest.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal; import static java.time.ZoneOffset.UTC; @@ -11,6 +15,8 @@ import net.snowflake.ingest.utils.Utils; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.Level; import org.openjdk.jmh.annotations.Mode; @@ -26,7 +32,14 @@ import org.openjdk.jmh.runner.options.TimeValue; @State(Scope.Thread) +@RunWith(Parameterized.class) public class InsertRowsBenchmarkTest { + @Parameterized.Parameters(name = "isIcebergMode: {0}") + public static Object[] isIcebergMode() { + return new Object[] {false, true}; + } + + @Parameterized.Parameter public boolean isIcebergMode; private SnowflakeStreamingIngestChannelInternal channel; private SnowflakeStreamingIngestClientInternal client; @@ -36,7 +49,10 @@ public class InsertRowsBenchmarkTest { @Setup(Level.Trial) public void setUpBeforeAll() { - client = new SnowflakeStreamingIngestClientInternal("client_PARQUET"); + // SNOW-1490151: Testing gaps + client = + new SnowflakeStreamingIngestClientInternal( + "client_PARQUET", isIcebergMode); channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ManyTablesIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/ManyTablesIT.java index d32adfe3f..408bf66ca 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ManyTablesIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ManyTablesIT.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal; import static net.snowflake.ingest.utils.Constants.ROLE; @@ -37,7 +41,8 @@ public class ManyTablesIT { @Before public void setUp() throws Exception { Properties props = TestUtils.getProperties(Constants.BdecVersion.THREE, false); - props.put(ParameterProvider.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST, 2); + props.put(ParameterProvider.MAX_CHUNKS_IN_BLOB, 2); + props.put(ParameterProvider.MAX_CHUNKS_IN_REGISTRATION_REQUEST, 2); if (props.getProperty(ROLE).equals("DEFAULT_ROLE")) { props.setProperty(ROLE, "ACCOUNTADMIN"); } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/OAuthBasicTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/OAuthBasicTest.java index 6351f267d..688ec0c91 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/OAuthBasicTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/OAuthBasicTest.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal; import java.util.Properties; @@ -113,8 +117,9 @@ public void testCreateOAuthClient() throws Exception { @Test public void testSetRefreshToken() throws Exception { + // SNOW-1490151: Testing gaps SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("TEST_CLIENT"); + new SnowflakeStreamingIngestClientInternal<>("TEST_CLIENT", false); MockOAuthClient mockOAuthClient = new MockOAuthClient(); OAuthManager oAuthManager = diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java index 86cece9c7..a0478bd8a 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal; import java.util.Arrays; @@ -6,12 +10,24 @@ import java.util.Map; import java.util.Properties; import net.snowflake.ingest.utils.Constants; +import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.ParameterProvider; +import net.snowflake.ingest.utils.SFException; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +@RunWith(Parameterized.class) public class ParameterProviderTest { + @Parameterized.Parameters(name = "isIcebergMode: {0}") + public static Object[] isIcebergMode() { + return new Object[] {false, true}; + } + + @Parameterized.Parameter public boolean isIcebergMode; + private Map getStartingParameterMap() { Map parameterMap = new HashMap<>(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, 1000L); @@ -31,7 +47,7 @@ private Map getStartingParameterMap() { public void withValuesSet() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); Assert.assertEquals(1000L, parameterProvider.getCachedMaxClientLagInMs()); Assert.assertEquals(4L, parameterProvider.getBufferFlushCheckIntervalInMs()); @@ -54,7 +70,7 @@ public void withNullProps() { parameterMap.put(ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS, 4L); parameterMap.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE, 6); parameterMap.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_BYTES, 1024); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, null); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, null, isIcebergMode); Assert.assertEquals(3000, parameterProvider.getCachedMaxClientLagInMs()); Assert.assertEquals(4, parameterProvider.getBufferFlushCheckIntervalInMs()); @@ -72,7 +88,7 @@ public void withNullParameterMap() { props.put(ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS, 4L); props.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE, 6); props.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_BYTES, 1024); - ParameterProvider parameterProvider = new ParameterProvider(null, props); + ParameterProvider parameterProvider = new ParameterProvider(null, props, isIcebergMode); Assert.assertEquals(3000, parameterProvider.getCachedMaxClientLagInMs()); Assert.assertEquals(4, parameterProvider.getBufferFlushCheckIntervalInMs()); @@ -85,10 +101,13 @@ public void withNullParameterMap() { @Test public void withNullInputs() { - ParameterProvider parameterProvider = new ParameterProvider(null, null); + ParameterProvider parameterProvider = new ParameterProvider(null, null, isIcebergMode); Assert.assertEquals( - ParameterProvider.MAX_CLIENT_LAG_DEFAULT, parameterProvider.getCachedMaxClientLagInMs()); + isIcebergMode + ? ParameterProvider.MAX_CLIENT_LAG_ICEBERG_MODE_DEFAULT + : ParameterProvider.MAX_CLIENT_LAG_DEFAULT, + parameterProvider.getCachedMaxClientLagInMs()); Assert.assertEquals( ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS_DEFAULT, parameterProvider.getBufferFlushCheckIntervalInMs()); @@ -105,10 +124,13 @@ public void withNullInputs() { @Test public void withDefaultValues() { - ParameterProvider parameterProvider = new ParameterProvider(); + ParameterProvider parameterProvider = new ParameterProvider(isIcebergMode); Assert.assertEquals( - ParameterProvider.MAX_CLIENT_LAG_DEFAULT, parameterProvider.getCachedMaxClientLagInMs()); + isIcebergMode + ? ParameterProvider.MAX_CLIENT_LAG_ICEBERG_MODE_DEFAULT + : ParameterProvider.MAX_CLIENT_LAG_DEFAULT, + parameterProvider.getCachedMaxClientLagInMs()); Assert.assertEquals( ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS_DEFAULT, parameterProvider.getBufferFlushCheckIntervalInMs()); @@ -135,6 +157,14 @@ public void withDefaultValues() { Assert.assertEquals( ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT, parameterProvider.getBdecParquetCompressionAlgorithm()); + Assert.assertEquals( + isIcebergMode + ? ParameterProvider.MAX_CHUNKS_IN_BLOB_ICEBERG_MODE_DEFAULT + : ParameterProvider.MAX_CHUNKS_IN_BLOB_DEFAULT, + parameterProvider.getMaxChunksInBlob()); + Assert.assertEquals( + ParameterProvider.MAX_CHUNKS_IN_REGISTRATION_REQUEST_DEFAULT, + parameterProvider.getMaxChunksInRegistrationRequest()); } @Test @@ -142,7 +172,7 @@ public void testMaxClientLagEnabled() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "2 second"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); Assert.assertEquals(2000, parameterProvider.getCachedMaxClientLagInMs()); // call again to trigger caching logic Assert.assertEquals(2000, parameterProvider.getCachedMaxClientLagInMs()); @@ -153,7 +183,7 @@ public void testMaxClientLagEnabledPluralTimeUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "2 seconds"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); Assert.assertEquals(2000, parameterProvider.getCachedMaxClientLagInMs()); } @@ -162,7 +192,7 @@ public void testMaxClientLagEnabledMinuteTimeUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "1 minute"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); Assert.assertEquals(60000, parameterProvider.getCachedMaxClientLagInMs()); } @@ -171,7 +201,7 @@ public void testMaxClientLagEnabledMinuteTimeUnitPluralTimeUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "2 minutes"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); Assert.assertEquals(120000, parameterProvider.getCachedMaxClientLagInMs()); } @@ -179,7 +209,7 @@ public void testMaxClientLagEnabledMinuteTimeUnitPluralTimeUnit() { public void testMaxClientLagEnabledDefaultValue() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); Assert.assertEquals( ParameterProvider.MAX_CLIENT_LAG_DEFAULT, parameterProvider.getCachedMaxClientLagInMs()); } @@ -189,7 +219,7 @@ public void testMaxClientLagEnabledDefaultUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "3000"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); Assert.assertEquals(3000, parameterProvider.getCachedMaxClientLagInMs()); } @@ -198,7 +228,7 @@ public void testMaxClientLagEnabledLongInput() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, 3000L); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); Assert.assertEquals(3000, parameterProvider.getCachedMaxClientLagInMs()); } @@ -207,7 +237,7 @@ public void testMaxClientLagEnabledMissingUnitTimeUnitSupplied() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, " year"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -221,7 +251,7 @@ public void testMaxClientLagEnabledInvalidTimeUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "1 year"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -235,7 +265,7 @@ public void testMaxClientLagEnabledInvalidUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "banana minute"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -249,7 +279,7 @@ public void testMaxClientLagEnabledThresholdBelow() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "0 second"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -263,7 +293,7 @@ public void testMaxClientLagEnabledThresholdAbove() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "11 minutes"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -277,7 +307,7 @@ public void testMaxClientLagEnableEmptyInput() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, ""); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -287,12 +317,46 @@ public void testMaxClientLagEnableEmptyInput() { } @Test - public void testMaxChunksInBlobAndRegistrationRequest() { + public void testMaxChunksInBlob() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); - parameterMap.put("max_chunks_in_blob_and_registration_request", 1); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); - Assert.assertEquals(1, parameterProvider.getMaxChunksInBlobAndRegistrationRequest()); + parameterMap.put("max_chunks_in_blob", 1); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); + Assert.assertEquals(1, parameterProvider.getMaxChunksInBlob()); + + if (isIcebergMode) { + SFException e = + Assert.assertThrows( + SFException.class, + () -> { + parameterMap.put("max_chunks_in_blob", 100); + new ParameterProvider(parameterMap, prop, isIcebergMode); + }); + Assert.assertEquals(e.getVendorCode(), ErrorCode.INVALID_CONFIG_PARAMETER.getMessageCode()); + } + } + + @Test + public void testMaxChunksInRegistrationRequest() { + Properties prop = new Properties(); + Map parameterMap = getStartingParameterMap(); + parameterMap.put("max_chunks_in_registration_request", 101); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); + Assert.assertEquals(101, parameterProvider.getMaxChunksInRegistrationRequest()); + + IllegalArgumentException e = + Assert.assertThrows( + IllegalArgumentException.class, + () -> { + parameterMap.put("max_chunks_in_registration_request", 0); + new ParameterProvider(parameterMap, prop, isIcebergMode); + }); + Assert.assertEquals( + e.getMessage(), + String.format( + "max_chunks_in_blobs (%s) should be less than or equal to" + + " make_chunks_in_registration_request (%s)", + parameterProvider.getMaxChunksInBlob(), 0)); } @Test @@ -303,7 +367,8 @@ public void testValidCompressionAlgorithmsAndWithUppercaseLowerCase() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM, v); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = + new ParameterProvider(parameterMap, prop, isIcebergMode); Assert.assertEquals( Constants.BdecParquetCompression.GZIP, parameterProvider.getBdecParquetCompressionAlgorithm()); @@ -314,7 +379,8 @@ public void testValidCompressionAlgorithmsAndWithUppercaseLowerCase() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM, v); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = + new ParameterProvider(parameterMap, prop, isIcebergMode); Assert.assertEquals( Constants.BdecParquetCompression.ZSTD, parameterProvider.getBdecParquetCompressionAlgorithm()); @@ -326,7 +392,7 @@ public void testInvalidCompressionAlgorithm() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM, "invalid_comp"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, isIcebergMode); try { parameterProvider.getBdecParquetCompressionAlgorithm(); Assert.fail("Should not have succeeded"); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RegisterServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RegisterServiceTest.java index 37eb5f96e..9d7b95008 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RegisterServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RegisterServiceTest.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal; import static net.snowflake.ingest.utils.Constants.BLOB_UPLOAD_TIMEOUT_IN_SEC; @@ -14,8 +18,17 @@ import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +@RunWith(Parameterized.class) public class RegisterServiceTest { + @Parameterized.Parameters(name = "isIcebergMode: {0}") + public static Object[] isIcebergMode() { + return new Object[] {false, true}; + } + + @Parameterized.Parameter public boolean isIcebergMode; @Test public void testRegisterService() throws ExecutionException, InterruptedException { @@ -45,7 +58,7 @@ public void testRegisterService() throws ExecutionException, InterruptedExceptio @Test public void testRegisterServiceTimeoutException() throws Exception { SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client"); + new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); RegisterService rs = new RegisterService<>(client, true); Pair, CompletableFuture> blobFuture1 = @@ -73,7 +86,7 @@ public void testRegisterServiceTimeoutException() throws Exception { @Test public void testRegisterServiceTimeoutException_testRetries() throws Exception { SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client"); + new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); RegisterService rs = new RegisterService<>(client, true); Pair, CompletableFuture> blobFuture1 = @@ -107,7 +120,7 @@ public void testRegisterServiceTimeoutException_testRetries() throws Exception { @Test public void testRegisterServiceNonTimeoutException() { SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client"); + new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); RegisterService rs = new RegisterService<>(client, true); CompletableFuture future = new CompletableFuture<>(); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java index 5d8d8d36a..dc3285df8 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java @@ -48,8 +48,11 @@ import net.snowflake.ingest.utils.Utils; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.mockito.Mockito; +@RunWith(Parameterized.class) public class SnowflakeStreamingIngestChannelTest { /** @@ -71,6 +74,13 @@ public long getFreeMemory() { } } + @Parameterized.Parameters(name = "isIcebergMode: {0}") + public static Object[] isIcebergMode() { + return new Object[] {false, true}; + } + + @Parameterized.Parameter public boolean isIcebergMode; + @Test public void testChannelFactoryNullFields() { String name = "CHANNEL"; @@ -80,7 +90,7 @@ public void testChannelFactoryNullFields() { long channelSequencer = 0L; long rowSequencer = 0L; SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client"); + new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); Object[] fields = new Object[] { @@ -122,7 +132,7 @@ public void testChannelFactorySuccess() { long rowSequencer = 0L; SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client"); + new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); SnowflakeStreamingIngestChannelInternal channel = SnowflakeStreamingIngestChannelFactory.builder(name) @@ -157,7 +167,7 @@ public void testChannelFactorySuccess() { @Test public void testChannelValid() { SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client"); + new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -207,7 +217,7 @@ public void testChannelValid() { @Test public void testChannelClose() { SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client"); + new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -351,6 +361,7 @@ public void testOpenChannelErrorResponse() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -421,6 +432,7 @@ public void testOpenChannelSnowflakeInternalErrorResponse() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -443,6 +455,10 @@ public void testOpenChannelSnowflakeInternalErrorResponse() throws Exception { @Test public void testOpenChannelSuccessResponse() throws Exception { + // TODO: SNOW-1490151 Iceberg testing gaps + if (isIcebergMode) { + return; + } String name = "CHANNEL"; String dbName = "STREAMINGINGEST_TEST"; String schemaName = "PUBLIC"; @@ -503,6 +519,7 @@ public void testOpenChannelSuccessResponse() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -541,7 +558,9 @@ public void testOpenChannelSuccessResponse() throws Exception { @Test public void testInsertRow() { SnowflakeStreamingIngestClientInternal client; - client = new SnowflakeStreamingIngestClientInternal("client_PARQUET"); + client = + new SnowflakeStreamingIngestClientInternal( + "client_PARQUET", isIcebergMode); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -625,7 +644,8 @@ public void testInsertTooLargeRow() { schema.forEach(x -> row.put(x.getName(), byteArrayOneMb)); SnowflakeStreamingIngestClientInternal client; - client = new SnowflakeStreamingIngestClientInternal("test_client"); + client = + new SnowflakeStreamingIngestClientInternal("test_client", isIcebergMode); // Test channel with on error CONTINUE SnowflakeStreamingIngestChannelInternal channel = @@ -709,7 +729,7 @@ public void testInsertRowThrottling() { memoryInfoProvider.maxMemory = maxMemory; SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client"); + new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -725,7 +745,7 @@ public void testInsertRowThrottling() { OpenChannelRequest.OnErrorOption.CONTINUE, UTC); - ParameterProvider parameterProvider = new ParameterProvider(); + ParameterProvider parameterProvider = new ParameterProvider(isIcebergMode); memoryInfoProvider.freeMemory = maxMemory * (parameterProvider.getInsertThrottleThresholdInPercentage() - 1) / 100; @@ -755,7 +775,7 @@ public void testInsertRowThrottling() { @Test public void testFlush() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -791,7 +811,7 @@ public void testFlush() throws Exception { @Test public void testClose() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); SnowflakeStreamingIngestChannel channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -825,7 +845,7 @@ public void testClose() throws Exception { @Test public void testDropOnClose() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -862,7 +882,7 @@ public void testDropOnClose() throws Exception { @Test public void testDropOnCloseInvalidChannel() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -895,7 +915,7 @@ public void testDropOnCloseInvalidChannel() throws Exception { public void testGetLatestCommittedOffsetToken() { String offsetToken = "10"; SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); SnowflakeStreamingIngestChannel channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java index 553efbd31..c9ca86b35 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal; import static java.time.ZoneOffset.UTC; @@ -66,9 +70,12 @@ import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.mockito.ArgumentMatchers; import org.mockito.Mockito; +@RunWith(Parameterized.class) public class SnowflakeStreamingIngestClientTest { private static final ObjectMapper objectMapper = new ObjectMapper(); @@ -79,6 +86,13 @@ public class SnowflakeStreamingIngestClientTest { SnowflakeStreamingIngestChannelInternal channel3; SnowflakeStreamingIngestChannelInternal channel4; + @Parameterized.Parameters(name = "isIcebergMode: {0}") + public static Object[] isIcebergMode() { + return new Object[] {false, true}; + } + + @Parameterized.Parameter public boolean isIcebergMode; + @Before public void setup() throws Exception { objectMapper.setVisibility(PropertyAccessor.GETTER, JsonAutoDetect.Visibility.ANY); @@ -98,6 +112,7 @@ public void setup() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -362,6 +377,7 @@ public void testGetChannelsStatusWithRequest() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -421,6 +437,7 @@ public void testDropChannel() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -466,6 +483,7 @@ public void testGetChannelsStatusWithRequestError() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -519,6 +537,7 @@ public void testRegisterBlobRequestCreationSuccess() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -710,6 +729,7 @@ public void testGetRetryBlobs() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -751,6 +771,7 @@ public void testRegisterBlobErrorResponse() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -798,6 +819,7 @@ public void testRegisterBlobSnowflakeInternalErrorResponse() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -854,6 +876,7 @@ public void testRegisterBlobSuccessResponse() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -938,6 +961,7 @@ public void testRegisterBlobsRetries() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -967,6 +991,7 @@ public void testRegisterBlobChunkLimit() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null)); @@ -1139,6 +1164,7 @@ public void testRegisterBlobsRetriesSucceeds() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -1213,6 +1239,7 @@ public void testRegisterBlobResponseWithInvalidChannel() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null); @@ -1263,7 +1290,7 @@ public void testRegisterBlobResponseWithInvalidChannel() throws Exception { @Test public void testFlush() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); ChannelsStatusResponse response = new ChannelsStatusResponse(); response.setStatusCode(0L); response.setMessage("Success"); @@ -1285,7 +1312,7 @@ public void testFlush() throws Exception { @Test public void testClose() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); ChannelsStatusResponse response = new ChannelsStatusResponse(); response.setStatusCode(0L); response.setMessage("Success"); @@ -1319,7 +1346,7 @@ public void testClose() throws Exception { @Test public void testCloseWithError() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); CompletableFuture future = new CompletableFuture<>(); future.completeExceptionally(new Exception("Simulating Error")); @@ -1357,7 +1384,7 @@ public void testCloseWithError() throws Exception { @Test public void testVerifyChannelsAreFullyCommittedSuccess() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel1", @@ -1405,6 +1432,7 @@ public void testFlushServiceException() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, parameterMap); @@ -1441,6 +1469,7 @@ public void testGetLatestCommittedOffsetTokens() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, + isIcebergMode, true, requestBuilder, null);