From 1657d1453fb2698829d264078d157b1cd4043090 Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Thu, 11 Jul 2024 14:30:02 -0700 Subject: [PATCH] eliminate iceberg logic --- .../connection/ServiceResponseHandler.java | 3 +- ...SnowflakeStreamingIngestClientFactory.java | 14 +- .../streaming/internal/BlobBuilder.java | 67 +++---- .../internal/ChannelConfigureRequest.java | 53 ------ .../internal/ChannelConfigureResponse.java | 46 ----- .../internal/DropChannelRequestInternal.java | 15 +- .../internal/ExternalVolumeManager.java | 173 ------------------ .../streaming/internal/FlushService.java | 7 +- .../internal/InternalParameterProvider.java | 20 -- .../internal/OpenChannelRequestInternal.java | 15 +- .../internal/SnowflakeServiceClient.java | 28 --- ...nowflakeStreamingIngestClientInternal.java | 41 +---- .../internal/StreamingIngestStorage.java | 16 -- .../net/snowflake/ingest/utils/Constants.java | 1 - .../net/snowflake/ingest/utils/ErrorCode.java | 3 +- .../ingest/utils/ParameterProvider.java | 125 ++++--------- .../streaming/internal/BlobBuilderTest.java | 12 +- .../streaming/internal/ChannelCacheTest.java | 6 +- .../streaming/internal/FlushServiceTest.java | 88 ++++----- .../internal/InsertRowsBenchmarkTest.java | 2 +- .../InternalParameterProviderTest.java | 26 --- .../streaming/internal/OAuthBasicTest.java | 6 +- .../internal/ParameterProviderTest.java | 102 +++-------- .../internal/RegisterServiceTest.java | 19 +- .../SnowflakeStreamingIngestChannelTest.java | 46 ++--- .../SnowflakeStreamingIngestClientTest.java | 36 +--- .../internal/StreamingIngestStorageTest.java | 2 - 27 files changed, 176 insertions(+), 796 deletions(-) delete mode 100644 src/main/java/net/snowflake/ingest/streaming/internal/ChannelConfigureRequest.java delete mode 100644 src/main/java/net/snowflake/ingest/streaming/internal/ChannelConfigureResponse.java delete mode 100644 src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManager.java delete mode 100644 src/main/java/net/snowflake/ingest/streaming/internal/InternalParameterProvider.java delete mode 100644 src/test/java/net/snowflake/ingest/streaming/internal/InternalParameterProviderTest.java diff --git a/src/main/java/net/snowflake/ingest/connection/ServiceResponseHandler.java b/src/main/java/net/snowflake/ingest/connection/ServiceResponseHandler.java index 034b4a6f0..7db9f4dab 100644 --- a/src/main/java/net/snowflake/ingest/connection/ServiceResponseHandler.java +++ b/src/main/java/net/snowflake/ingest/connection/ServiceResponseHandler.java @@ -42,8 +42,7 @@ public enum ApiName { STREAMING_DROP_CHANNEL("POST"), STREAMING_CHANNEL_STATUS("POST"), STREAMING_REGISTER_BLOB("POST"), - STREAMING_CLIENT_CONFIGURE("POST"), - STREAMING_CHANNEL_CONFIGURE("POST"); + STREAMING_CLIENT_CONFIGURE("POST"); private final String httpMethod; private ApiName(String httpMethod) { diff --git a/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java b/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java index 89e528693..4487d6d00 100644 --- a/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java +++ b/src/main/java/net/snowflake/ingest/streaming/SnowflakeStreamingIngestClientFactory.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.streaming; @@ -49,11 +49,6 @@ public Builder setParameterOverrides(Map parameterOverrides) { return this; } - public Builder setIsIceberg(boolean isIcebergMode) { - this.isIcebergMode = isIcebergMode; - return this; - } - public Builder setIsTestMode(boolean isTestMode) { this.isTestMode = isTestMode; return this; @@ -67,12 +62,7 @@ public SnowflakeStreamingIngestClient build() { SnowflakeURL accountURL = new SnowflakeURL(prop.getProperty(Constants.ACCOUNT_URL)); return new SnowflakeStreamingIngestClientInternal<>( - this.name, - accountURL, - prop, - this.parameterOverrides, - this.isIcebergMode, - this.isTestMode); + this.name, accountURL, prop, this.parameterOverrides, this.isTestMode); } } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java b/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java index 75a623f69..dbee9d2ea 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.streaming.internal; @@ -61,14 +61,10 @@ class BlobBuilder { * @param blobData All the data for one blob. Assumes that all ChannelData in the inner List * belongs to the same table. Will error if this is not the case * @param bdecVersion version of blob - * @param encrypt If the output chunk is encrypted or not * @return {@link Blob} data */ static Blob constructBlobAndMetadata( - String filePath, - List>> blobData, - Constants.BdecVersion bdecVersion, - boolean encrypt) + String filePath, List>> blobData, Constants.BdecVersion bdecVersion) throws IOException, NoSuchPaddingException, NoSuchAlgorithmException, InvalidAlgorithmParameterException, InvalidKeyException, IllegalBlockSizeException, BadPaddingException { @@ -87,32 +83,24 @@ static Blob constructBlobAndMetadata( flusher.serialize(channelsDataPerTable, filePath); if (!serializedChunk.channelsMetadataList.isEmpty()) { - byte[] compressedChunkData; - int chunkLength; - - if (encrypt) { - Pair paddedChunk = - padChunk(serializedChunk.chunkData, Constants.ENCRYPTION_ALGORITHM_BLOCK_SIZE_BYTES); - byte[] paddedChunkData = paddedChunk.getFirst(); - chunkLength = paddedChunk.getSecond(); - - // Encrypt the compressed chunk data, the encryption key is derived using the key from - // server with the full blob path. - // We need to maintain IV as a block counter for the whole file, even interleaved, - // to align with decryption on the Snowflake query path. - // TODO: address alignment for the header SNOW-557866 - long iv = curDataSize / Constants.ENCRYPTION_ALGORITHM_BLOCK_SIZE_BYTES; - compressedChunkData = - Cryptor.encrypt( - paddedChunkData, firstChannelFlushContext.getEncryptionKey(), filePath, iv); - } else { - compressedChunkData = serializedChunk.chunkData.toByteArray(); - chunkLength = compressedChunkData.length; - } + ByteArrayOutputStream chunkData = serializedChunk.chunkData; + Pair paddedChunk = + padChunk(chunkData, Constants.ENCRYPTION_ALGORITHM_BLOCK_SIZE_BYTES); + byte[] paddedChunkData = paddedChunk.getFirst(); + int paddedChunkLength = paddedChunk.getSecond(); + // Encrypt the compressed chunk data, the encryption key is derived using the key from + // server with the full blob path. + // We need to maintain IV as a block counter for the whole file, even interleaved, + // to align with decryption on the Snowflake query path. + // TODO: address alignment for the header SNOW-557866 + long iv = curDataSize / Constants.ENCRYPTION_ALGORITHM_BLOCK_SIZE_BYTES; + byte[] encryptedCompressedChunkData = + Cryptor.encrypt( + paddedChunkData, firstChannelFlushContext.getEncryptionKey(), filePath, iv); // Compute the md5 of the chunk data - String md5 = computeMD5(compressedChunkData, chunkLength); - int compressedChunkDataSize = compressedChunkData.length; + String md5 = computeMD5(encryptedCompressedChunkData, paddedChunkLength); + int encryptedCompressedChunkDataSize = encryptedCompressedChunkData.length; // Create chunk metadata long startOffset = curDataSize; @@ -122,9 +110,9 @@ static Blob constructBlobAndMetadata( // The start offset will be updated later in BlobBuilder#build to include the blob // header .setChunkStartOffset(startOffset) - // The chunkLength is used because it is the actual data size used for + // The paddedChunkLength is used because it is the actual data size used for // decompression and md5 calculation on server side. - .setChunkLength(chunkLength) + .setChunkLength(paddedChunkLength) .setUncompressedChunkLength((int) serializedChunk.chunkEstimatedUncompressedSize) .setChannelList(serializedChunk.channelsMetadataList) .setChunkMD5(md5) @@ -138,22 +126,21 @@ static Blob constructBlobAndMetadata( // Add chunk metadata and data to the list chunksMetadataList.add(chunkMetadata); - chunksDataList.add(compressedChunkData); - curDataSize += compressedChunkDataSize; - crc.update(compressedChunkData, 0, compressedChunkDataSize); + chunksDataList.add(encryptedCompressedChunkData); + curDataSize += encryptedCompressedChunkDataSize; + crc.update(encryptedCompressedChunkData, 0, encryptedCompressedChunkDataSize); logger.logInfo( "Finish building chunk in blob={}, table={}, rowCount={}, startOffset={}," - + " estimatedUncompressedSize={}, chunkLength={}, compressedSize={}," - + " encryption={}, bdecVersion={}", + + " estimatedUncompressedSize={}, paddedChunkLength={}, encryptedCompressedSize={}," + + " bdecVersion={}", filePath, firstChannelFlushContext.getFullyQualifiedTableName(), serializedChunk.rowCount, startOffset, serializedChunk.chunkEstimatedUncompressedSize, - chunkLength, - compressedChunkDataSize, - encrypt, + paddedChunkLength, + encryptedCompressedChunkDataSize, bdecVersion); } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelConfigureRequest.java b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelConfigureRequest.java deleted file mode 100644 index 31bca95ac..000000000 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelConfigureRequest.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. - */ - -package net.snowflake.ingest.streaming.internal; - -import com.fasterxml.jackson.annotation.JsonProperty; - -/** Class used to serialize the channel configure request. */ -class ChannelConfigureRequest extends ConfigureRequest { - @JsonProperty("database") - private String database; - - @JsonProperty("schema") - private String schema; - - @JsonProperty("table") - private String table; - - /** - * Constructor for channel configure request - * - * @param role Role to be used for the request. - * @param database Database name. - * @param schema Schema name. - * @param table Table name. - */ - ChannelConfigureRequest(String role, String database, String schema, String table) { - setRole(role); - this.database = database; - this.schema = schema; - this.table = table; - } - - String getDatabase() { - return database; - } - - String getSchema() { - return schema; - } - - String getTable() { - return table; - } - - @Override - public String getStringForLogging() { - return String.format( - "ChannelConfigureRequest(role=%s, db=%s, schema=%s, table=%s, file_name=%s)", - getRole(), database, schema, table, getFileName()); - } -} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelConfigureResponse.java b/src/main/java/net/snowflake/ingest/streaming/internal/ChannelConfigureResponse.java deleted file mode 100644 index da65960b4..000000000 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ChannelConfigureResponse.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. - */ - -package net.snowflake.ingest.streaming.internal; - -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import com.fasterxml.jackson.annotation.JsonProperty; - -/** Class used to deserialize responses from channel configure endpoint */ -@JsonIgnoreProperties(ignoreUnknown = true) -class ChannelConfigureResponse extends StreamingIngestResponse { - @JsonProperty("status_code") - private Long statusCode; - - @JsonProperty("message") - private String message; - - @JsonProperty("stage_location") - private FileLocationInfo stageLocation; - - @Override - Long getStatusCode() { - return statusCode; - } - - void setStatusCode(Long statusCode) { - this.statusCode = statusCode; - } - - String getMessage() { - return message; - } - - void setMessage(String message) { - this.message = message; - } - - FileLocationInfo getStageLocation() { - return stageLocation; - } - - void setStageLocation(FileLocationInfo stageLocation) { - this.stageLocation = stageLocation; - } -} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/DropChannelRequestInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/DropChannelRequestInternal.java index 25599999e..a6e1e1746 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/DropChannelRequestInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/DropChannelRequestInternal.java @@ -29,9 +29,6 @@ class DropChannelRequestInternal implements StreamingIngestRequest { @JsonProperty("schema") private String schema; - @JsonProperty("is_iceberg") - private boolean isIceberg; - @JsonInclude(JsonInclude.Include.NON_NULL) @JsonProperty("client_sequencer") Long clientSequencer; @@ -43,8 +40,7 @@ class DropChannelRequestInternal implements StreamingIngestRequest { String schema, String table, String channel, - Long clientSequencer, - boolean isIceberg) { + Long clientSequencer) { this.requestId = requestId; this.role = role; this.database = database; @@ -52,7 +48,6 @@ class DropChannelRequestInternal implements StreamingIngestRequest { this.table = table; this.channel = channel; this.clientSequencer = clientSequencer; - this.isIceberg = isIceberg; } String getRequestId() { @@ -79,10 +74,6 @@ String getSchema() { return schema; } - boolean getIsIceberg() { - return isIceberg; - } - Long getClientSequencer() { return clientSequencer; } @@ -95,7 +86,7 @@ String getFullyQualifiedTableName() { public String getStringForLogging() { return String.format( "DropChannelRequest(requestId=%s, role=%s, db=%s, schema=%s, table=%s, channel=%s," - + " isIceberg=%s, clientSequencer=%s)", - requestId, role, database, schema, table, channel, isIceberg, clientSequencer); + + " clientSequencer=%s)", + requestId, role, database, schema, table, channel, clientSequencer); } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManager.java b/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManager.java deleted file mode 100644 index 2b8796bb2..000000000 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManager.java +++ /dev/null @@ -1,173 +0,0 @@ -/* - * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. - */ - -package net.snowflake.ingest.streaming.internal; - -import java.io.IOException; -import java.util.Calendar; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; -import net.snowflake.client.jdbc.SnowflakeSQLException; -import net.snowflake.ingest.connection.IngestResponseException; -import net.snowflake.ingest.utils.ErrorCode; -import net.snowflake.ingest.utils.SFException; -import net.snowflake.ingest.utils.Utils; - -class ExternalVolumeLocation { - public final String dbName; - public final String schemaName; - public final String tableName; - - public ExternalVolumeLocation(String dbName, String schemaName, String tableName) { - this.dbName = dbName; - this.schemaName = schemaName; - this.tableName = tableName; - } -} - -/** Class to manage multiple external volumes */ -class ExternalVolumeManager implements StorageManager { - // Reference to the external volume per table - private final Map> externalVolumeMap; - - // name of the owning client - private final String clientName; - - // role of the owning client - private final String role; - - // Reference to the Snowflake service client used for configure calls - private final SnowflakeServiceClient snowflakeServiceClient; - - // Client prefix generated by the Snowflake server - private final String clientPrefix; - - /** - * Constructor for ExternalVolumeManager - * - * @param isTestMode whether the manager in test mode - * @param role the role of the client - * @param clientName the name of the client - * @param snowflakeServiceClient the Snowflake service client used for configure calls - */ - ExternalVolumeManager( - boolean isTestMode, - String role, - String clientName, - SnowflakeServiceClient snowflakeServiceClient) { - this.role = role; - this.clientName = clientName; - this.snowflakeServiceClient = snowflakeServiceClient; - this.externalVolumeMap = new ConcurrentHashMap<>(); - try { - this.clientPrefix = - isTestMode - ? "testPrefix" - : this.snowflakeServiceClient - .clientConfigure(new ClientConfigureRequest(role)) - .getClientPrefix(); - } catch (IngestResponseException | IOException e) { - throw new SFException(e, ErrorCode.CLIENT_CONFIGURE_FAILURE, e.getMessage()); - } - } - - /** - * Given a fully qualified table name, return the target storage by looking up the table name - * - * @param fullyQualifiedTableName the target fully qualified table name - * @return target storage - */ - @Override - public StreamingIngestStorage getStorage( - String fullyQualifiedTableName) { - // Only one chunk per blob in Iceberg mode. - StreamingIngestStorage stage = - this.externalVolumeMap.get(fullyQualifiedTableName); - - if (stage == null) { - throw new SFException( - ErrorCode.INTERNAL_ERROR, - String.format("No external volume found for table %s", fullyQualifiedTableName)); - } - - return stage; - } - - /** - * Add a storage to the manager by looking up the table name from the open channel response - * - * @param dbName the database name - * @param schemaName the schema name - * @param tableName the table name - * @param fileLocationInfo response from open channel - */ - @Override - public void addStorage( - String dbName, String schemaName, String tableName, FileLocationInfo fileLocationInfo) { - String fullyQualifiedTableName = - Utils.getFullyQualifiedTableName(dbName, schemaName, tableName); - - try { - this.externalVolumeMap.put( - fullyQualifiedTableName, - new StreamingIngestStorage( - this, - this.clientName, - fileLocationInfo, - new ExternalVolumeLocation(dbName, schemaName, tableName), - DEFAULT_MAX_UPLOAD_RETRIES)); - } catch (SnowflakeSQLException | IOException err) { - throw new SFException(err, ErrorCode.UNABLE_TO_CONNECT_TO_STORAGE); - } - } - - /** - * Gets the latest file location info (with a renewed short-lived access token) for the specified - * location - * - * @param location A reference to the target location - * @param fileName optional filename for single-file signed URL fetch from server - * @return the new location information - */ - @Override - public FileLocationInfo getRefreshedLocation( - ExternalVolumeLocation location, Optional fileName) { - try { - ChannelConfigureRequest request = - new ChannelConfigureRequest( - this.role, location.dbName, location.schemaName, location.tableName); - fileName.ifPresent(request::setFileName); - ChannelConfigureResponse response = this.snowflakeServiceClient.channelConfigure(request); - return response.getStageLocation(); - } catch (IngestResponseException | IOException e) { - throw new SFException(e, ErrorCode.CLIENT_CONFIGURE_FAILURE, e.getMessage()); - } - } - - // TODO: SNOW-1502887 Blob path generation for iceberg table - @Override - public String generateBlobPath() { - return "snow_dummy_file_name"; - } - - // TODO: SNOW-1502887 Blob path generation for iceberg table - @Override - public void decrementBlobSequencer() {} - - // TODO: SNOW-1502887 Blob path generation for iceberg table - public String getBlobPath(Calendar calendar, String clientPrefix) { - return ""; - } - - /** - * Get the client prefix from first external volume in the map - * - * @return the client prefix - */ - @Override - public String getClientPrefix() { - return this.clientPrefix; - } -} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java index 7f3aafd56..b34335194 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java @@ -522,12 +522,7 @@ BlobMetadata buildAndUpload( Timer.Context buildContext = Utils.createTimerContext(this.owningClient.buildLatency); // Construct the blob along with the metadata of the blob - BlobBuilder.Blob blob = - BlobBuilder.constructBlobAndMetadata( - blobPath, - blobData, - bdecVersion, - this.owningClient.getInternalParameterProvider().getEnableChunkEncryption()); + BlobBuilder.Blob blob = BlobBuilder.constructBlobAndMetadata(blobPath, blobData, bdecVersion); blob.blobStats.setBuildDurationMs(buildContext); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/InternalParameterProvider.java b/src/main/java/net/snowflake/ingest/streaming/internal/InternalParameterProvider.java deleted file mode 100644 index 46a99b671..000000000 --- a/src/main/java/net/snowflake/ingest/streaming/internal/InternalParameterProvider.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. - */ - -package net.snowflake.ingest.streaming.internal; - -/** A class to provide non-configurable constants depends on Iceberg or non-Iceberg mode */ -class InternalParameterProvider { - private final boolean isIcebergMode; - - InternalParameterProvider(boolean isIcebergMode) { - this.isIcebergMode = isIcebergMode; - } - - boolean getEnableChunkEncryption() { - // When in Iceberg mode, chunk encryption is disabled. Otherwise, it is enabled. Since Iceberg - // mode does not need client-side encryption. - return !isIcebergMode; - } -} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelRequestInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelRequestInternal.java index ffbb553f3..f58198d31 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelRequestInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelRequestInternal.java @@ -32,9 +32,6 @@ class OpenChannelRequestInternal implements StreamingIngestRequest { @JsonProperty("write_mode") private String writeMode; - @JsonProperty("is_iceberg") - private boolean isIceberg; - @JsonInclude(JsonInclude.Include.NON_NULL) @JsonProperty("offset_token") private String offsetToken; @@ -47,8 +44,7 @@ class OpenChannelRequestInternal implements StreamingIngestRequest { String table, String channel, Constants.WriteMode writeMode, - String offsetToken, - boolean isIceberg) { + String offsetToken) { this.requestId = requestId; this.role = role; this.database = database; @@ -56,7 +52,6 @@ class OpenChannelRequestInternal implements StreamingIngestRequest { this.table = table; this.channel = channel; this.writeMode = writeMode.name(); - this.isIceberg = isIceberg; this.offsetToken = offsetToken; } @@ -88,10 +83,6 @@ String getWriteMode() { return writeMode; } - boolean getIsIceberg() { - return isIceberg; - } - String getOffsetToken() { return offsetToken; } @@ -100,7 +91,7 @@ String getOffsetToken() { public String getStringForLogging() { return String.format( "OpenChannelRequestInternal(requestId=%s, role=%s, db=%s, schema=%s, table=%s, channel=%s," - + " writeMode=%s, isIceberg=%s)", - requestId, role, database, schema, table, channel, writeMode, isIceberg); + + " writeMode=%s)", + requestId, role, database, schema, table, channel, writeMode); } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeServiceClient.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeServiceClient.java index 947c86dbb..904e4f93a 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeServiceClient.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeServiceClient.java @@ -4,14 +4,12 @@ package net.snowflake.ingest.streaming.internal; -import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_CHANNEL_CONFIGURE; import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_CHANNEL_STATUS; import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_CLIENT_CONFIGURE; import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_DROP_CHANNEL; import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_OPEN_CHANNEL; import static net.snowflake.ingest.connection.ServiceResponseHandler.ApiName.STREAMING_REGISTER_BLOB; import static net.snowflake.ingest.streaming.internal.StreamingIngestUtils.executeWithRetries; -import static net.snowflake.ingest.utils.Constants.CHANNEL_CONFIGURE_ENDPOINT; import static net.snowflake.ingest.utils.Constants.CHANNEL_STATUS_ENDPOINT; import static net.snowflake.ingest.utils.Constants.CLIENT_CONFIGURE_ENDPOINT; import static net.snowflake.ingest.utils.Constants.DROP_CHANNEL_ENDPOINT; @@ -76,32 +74,6 @@ ClientConfigureResponse clientConfigure(ClientConfigureRequest request) return response; } - /** - * Configures a storage given a {@link ChannelConfigureRequest}. - * - * @param request the channel configuration request - * @return the response from the configuration request - */ - ChannelConfigureResponse channelConfigure(ChannelConfigureRequest request) - throws IngestResponseException, IOException { - ChannelConfigureResponse response = - executeApiRequestWithRetries( - ChannelConfigureResponse.class, - request, - CHANNEL_CONFIGURE_ENDPOINT, - "channel configure", - STREAMING_CHANNEL_CONFIGURE); - - if (response.getStatusCode() != RESPONSE_SUCCESS) { - logger.logDebug( - "Channel configure request failed, request={}, response={}", - request.getStringForLogging(), - response.getMessage()); - throw new SFException(ErrorCode.CHANNEL_CONFIGURE_FAILURE, response.getMessage()); - } - return response; - } - /** * Opens a channel given a {@link OpenChannelRequestInternal}. * diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index 43397389d..aee73b8c4 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -94,9 +94,6 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea // Provides constant values that can be set by constructor private final ParameterProvider parameterProvider; - // Provides constant values which is determined by the Iceberg or non-Iceberg mode - private final InternalParameterProvider internalParameterProvider; - // Name of the client private final String name; @@ -118,9 +115,6 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea // Indicates whether the client has closed private volatile boolean isClosed; - // Indicates wheter the client is streaming to Iceberg tables - private final boolean isIcebergMode; - // Indicates whether the client is under test mode private final boolean isTestMode; @@ -155,7 +149,6 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea * @param accountURL Snowflake account url * @param prop connection properties * @param httpClient http client for sending request - * @param isIcebergMode whether we're streaming to iceberg tables * @param isTestMode whether we're under test mode * @param requestBuilder http request builder * @param parameterOverrides parameters we override in case we want to set different values @@ -165,16 +158,13 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea SnowflakeURL accountURL, Properties prop, CloseableHttpClient httpClient, - boolean isIcebergMode, boolean isTestMode, RequestBuilder requestBuilder, Map parameterOverrides) { - this.parameterProvider = new ParameterProvider(parameterOverrides, prop, isIcebergMode); - this.internalParameterProvider = new InternalParameterProvider(isIcebergMode); + this.parameterProvider = new ParameterProvider(parameterOverrides, prop); this.name = name; String accountName = accountURL == null ? null : accountURL.getAccount(); - this.isIcebergMode = isIcebergMode; this.isTestMode = isTestMode; this.httpClient = httpClient == null ? HttpUtil.getHttpClient(accountName) : httpClient; this.channelCache = new ChannelCache<>(); @@ -238,11 +228,7 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea this.snowflakeServiceClient = new SnowflakeServiceClient(this.httpClient, this.requestBuilder); this.storageManager = - isIcebergMode - ? new ExternalVolumeManager( - isTestMode, this.role, this.name, this.snowflakeServiceClient) - : new InternalStageManager( - isTestMode, this.role, this.name, this.snowflakeServiceClient); + new InternalStageManager(isTestMode, this.role, this.name, this.snowflakeServiceClient); try { this.flushService = @@ -268,7 +254,6 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea * @param accountURL Snowflake account url * @param prop connection properties * @param parameterOverrides map of parameters to override for this client - * @param isIcebergMode whether we're streaming to iceberg tables * @param isTestMode indicates whether it's under test mode */ public SnowflakeStreamingIngestClientInternal( @@ -276,17 +261,16 @@ public SnowflakeStreamingIngestClientInternal( SnowflakeURL accountURL, Properties prop, Map parameterOverrides, - boolean isIcebergMode, boolean isTestMode) { - this(name, accountURL, prop, null, isIcebergMode, isTestMode, null, parameterOverrides); + this(name, accountURL, prop, null, isTestMode, null, parameterOverrides); } /*** Constructor for TEST ONLY * * @param name the name of the client */ - SnowflakeStreamingIngestClientInternal(String name, boolean isIcebergMode) { - this(name, null, null, null, isIcebergMode, true, null, new HashMap<>()); + SnowflakeStreamingIngestClientInternal(String name) { + this(name, null, null, null, true, null, new HashMap<>()); } // TESTING ONLY - inject the request builder @@ -349,8 +333,7 @@ public SnowflakeStreamingIngestChannelInternal openChannel(OpenChannelRequest request.getTableName(), request.getChannelName(), Constants.WriteMode.CLOUD_STORAGE, - request.getOffsetToken(), - isIcebergMode); + request.getOffsetToken()); OpenChannelResponse response = snowflakeServiceClient.openChannel(openChannelRequest); logger.logInfo( @@ -421,8 +404,7 @@ public void dropChannel(DropChannelRequest request) { request.getChannelName(), request instanceof DropChannelVersionRequest ? ((DropChannelVersionRequest) request).getClientSequencer() - : null, - isIcebergMode); + : null); snowflakeServiceClient.dropChannel(dropChannelRequest); logger.logInfo( @@ -904,15 +886,6 @@ ParameterProvider getParameterProvider() { return parameterProvider; } - /** - * Get InternalParameterProvider with internal parameters - * - * @return {@link InternalParameterProvider} used by the client - */ - InternalParameterProvider getInternalParameterProvider() { - return internalParameterProvider; - } - /** * Set refresh token, this method is for refresh token renewal without requiring to restart * client. This method only works when the authorization type is OAuth diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorage.java b/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorage.java index eb9f10826..97c8e4a05 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorage.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorage.java @@ -280,22 +280,6 @@ private SnowflakeFileTransferMetadataWithAge createFileTransferMetadataWithAge( return this.fileTransferMetadataWithAge; } - /** - * Creates a client-specific prefix that will be also part of the files registered by this client. - * The prefix will include a server-side generated string and the GlobalID of the deployment the - * client is registering blobs to. The latter (deploymentId) is needed in order to guarantee that - * blob filenames are unique across deployments even with replication enabled. - * - * @param response the client/configure response from the server - * @return the client prefix. - */ - private String createClientPrefix(final ClientConfigureResponse response) { - final String prefix = response.getPrefix() == null ? "" : response.getPrefix(); - final String deploymentId = - response.getDeploymentId() != null ? "_" + response.getDeploymentId() : ""; - return prefix + deploymentId; - } - /** * GCS requires a signed url per file. We need to fetch this from the server for each put * diff --git a/src/main/java/net/snowflake/ingest/utils/Constants.java b/src/main/java/net/snowflake/ingest/utils/Constants.java index 3d09d9a2a..15ea7f058 100644 --- a/src/main/java/net/snowflake/ingest/utils/Constants.java +++ b/src/main/java/net/snowflake/ingest/utils/Constants.java @@ -52,7 +52,6 @@ public class Constants { public static final String BLOB_EXTENSION_TYPE = "bdec"; public static final int MAX_THREAD_COUNT = Integer.MAX_VALUE; public static final String CLIENT_CONFIGURE_ENDPOINT = "/v1/streaming/client/configure/"; - public static final String CHANNEL_CONFIGURE_ENDPOINT = "/v1/streaming/channels/configure/"; public static final int COMMIT_MAX_RETRY_COUNT = 60; public static final int COMMIT_RETRY_INTERVAL_IN_MS = 1000; public static final String ENCRYPTION_ALGORITHM = "AES/CTR/NoPadding"; diff --git a/src/main/java/net/snowflake/ingest/utils/ErrorCode.java b/src/main/java/net/snowflake/ingest/utils/ErrorCode.java index 4091e98bf..565973a96 100644 --- a/src/main/java/net/snowflake/ingest/utils/ErrorCode.java +++ b/src/main/java/net/snowflake/ingest/utils/ErrorCode.java @@ -41,8 +41,7 @@ public enum ErrorCode { OAUTH_REFRESH_TOKEN_ERROR("0033"), INVALID_CONFIG_PARAMETER("0034"), CRYPTO_PROVIDER_ERROR("0035"), - DROP_CHANNEL_FAILURE("0036"), - CHANNEL_CONFIGURE_FAILURE("0037"); + DROP_CHANNEL_FAILURE("0036"); public static final String errorMessageResource = "net.snowflake.ingest.ingest_error_messages"; diff --git a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java index 7d8cb230f..c5779adc6 100644 --- a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java +++ b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java @@ -4,8 +4,6 @@ package net.snowflake.ingest.utils; -import static net.snowflake.ingest.utils.ErrorCode.INVALID_CONFIG_PARAMETER; - import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -61,7 +59,6 @@ public class ParameterProvider { // Lag related parameters public static final long MAX_CLIENT_LAG_DEFAULT = 1000; // 1 second - public static final long MAX_CLIENT_LAG_ICEBERG_MODE_DEFAULT = 30000; // 30 seconds static final long MAX_CLIENT_LAG_MS_MIN = TimeUnit.SECONDS.toMillis(1); static final long MAX_CLIENT_LAG_MS_MAX = TimeUnit.MINUTES.toMillis(10); @@ -71,9 +68,6 @@ public class ParameterProvider { public static final Constants.BdecParquetCompression BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT = Constants.BdecParquetCompression.GZIP; - /* Iceberg mode parameters: When streaming to Iceberg mode, different default parameters are required because it generates Parquet files instead of BDEC files. */ - public static final int MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT = 1; - /* Parameter that enables using internal Parquet buffers for buffering of rows before serializing. It reduces memory consumption compared to using Java Objects for buffering.*/ public static final boolean ENABLE_PARQUET_INTERNAL_BUFFERING_DEFAULT = false; @@ -90,25 +84,18 @@ public class ParameterProvider { * * @param parameterOverrides Map of parameter name to value * @param props Properties from profile file - * @param isIcebergMode If the provided parameters need to be verified and modified to meet - * Iceberg mode */ - public ParameterProvider( - Map parameterOverrides, Properties props, boolean isIcebergMode) { - this.setParameterMap(parameterOverrides, props, isIcebergMode); + public ParameterProvider(Map parameterOverrides, Properties props) { + this.setParameterMap(parameterOverrides, props); } /** Empty constructor for tests */ - public ParameterProvider(boolean isIcebergMode) { - this(null, null, isIcebergMode); + public ParameterProvider() { + this(null, null); } - private void checkAndUpdate( - String key, - Object defaultValue, - Map parameterOverrides, - Properties props, - boolean enforceDefault) { + private void updateValue( + String key, Object defaultValue, Map parameterOverrides, Properties props) { if (parameterOverrides != null && props != null) { this.parameterMap.put( key, parameterOverrides.getOrDefault(key, props.getOrDefault(key, defaultValue))); @@ -116,19 +103,6 @@ private void checkAndUpdate( this.parameterMap.put(key, parameterOverrides.getOrDefault(key, defaultValue)); } else if (props != null) { this.parameterMap.put(key, props.getOrDefault(key, defaultValue)); - } else { - this.parameterMap.put(key, defaultValue); - } - - if (enforceDefault) { - if (!this.parameterMap.getOrDefault(key, defaultValue).equals(defaultValue)) { - throw new SFException( - INVALID_CONFIG_PARAMETER, - String.format( - "The value %s for %s is not configurable, should be %s.", - this.parameterMap.get(key), key, defaultValue)); - } - this.parameterMap.put(key, defaultValue); } } @@ -137,11 +111,8 @@ private void checkAndUpdate( * * @param parameterOverrides Map of parameter name -> value * @param props Properties file provided to client constructor - * @param isIcebergMode If the provided parameters need to be verified and modified to meet - * Iceberg mode */ - private void setParameterMap( - Map parameterOverrides, Properties props, boolean isIcebergMode) { + private void setParameterMap(Map parameterOverrides, Properties props) { // BUFFER_FLUSH_INTERVAL_IN_MILLIS is deprecated and disallowed if ((parameterOverrides != null && parameterOverrides.containsKey(BUFFER_FLUSH_INTERVAL_IN_MILLIS)) @@ -152,101 +123,75 @@ private void setParameterMap( BUFFER_FLUSH_INTERVAL_IN_MILLIS, MAX_CLIENT_LAG)); } - this.checkAndUpdate( + this.updateValue( BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS, BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS_DEFAULT, parameterOverrides, - props, - false); + props); - this.checkAndUpdate( + this.updateValue( INSERT_THROTTLE_INTERVAL_IN_MILLIS, INSERT_THROTTLE_INTERVAL_IN_MILLIS_DEFAULT, parameterOverrides, - props, - false); + props); - this.checkAndUpdate( + this.updateValue( INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE, INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE_DEFAULT, parameterOverrides, - props, - false); + props); - this.checkAndUpdate( + this.updateValue( INSERT_THROTTLE_THRESHOLD_IN_BYTES, INSERT_THROTTLE_THRESHOLD_IN_BYTES_DEFAULT, parameterOverrides, - props, - false); + props); - this.checkAndUpdate( + this.updateValue( ENABLE_SNOWPIPE_STREAMING_METRICS, SNOWPIPE_STREAMING_METRICS_DEFAULT, parameterOverrides, - props, - false); + props); - this.checkAndUpdate( - BLOB_FORMAT_VERSION, BLOB_FORMAT_VERSION_DEFAULT, parameterOverrides, props, false); + this.updateValue(BLOB_FORMAT_VERSION, BLOB_FORMAT_VERSION_DEFAULT, parameterOverrides, props); getBlobFormatVersion(); // to verify parsing the configured value - this.checkAndUpdate( - IO_TIME_CPU_RATIO, IO_TIME_CPU_RATIO_DEFAULT, parameterOverrides, props, false); + this.updateValue(IO_TIME_CPU_RATIO, IO_TIME_CPU_RATIO_DEFAULT, parameterOverrides, props); - this.checkAndUpdate( + this.updateValue( BLOB_UPLOAD_MAX_RETRY_COUNT, BLOB_UPLOAD_MAX_RETRY_COUNT_DEFAULT, parameterOverrides, - props, - false); + props); - this.checkAndUpdate( - MAX_MEMORY_LIMIT_IN_BYTES, - MAX_MEMORY_LIMIT_IN_BYTES_DEFAULT, - parameterOverrides, - props, - false); + this.updateValue( + MAX_MEMORY_LIMIT_IN_BYTES, MAX_MEMORY_LIMIT_IN_BYTES_DEFAULT, parameterOverrides, props); - this.checkAndUpdate( + this.updateValue( ENABLE_PARQUET_INTERNAL_BUFFERING, ENABLE_PARQUET_INTERNAL_BUFFERING_DEFAULT, parameterOverrides, - props, - false); + props); - this.checkAndUpdate( - MAX_CHANNEL_SIZE_IN_BYTES, - MAX_CHANNEL_SIZE_IN_BYTES_DEFAULT, - parameterOverrides, - props, - false); + this.updateValue( + MAX_CHANNEL_SIZE_IN_BYTES, MAX_CHANNEL_SIZE_IN_BYTES_DEFAULT, parameterOverrides, props); - this.checkAndUpdate( - MAX_CHUNK_SIZE_IN_BYTES, MAX_CHUNK_SIZE_IN_BYTES_DEFAULT, parameterOverrides, props, false); + this.updateValue( + MAX_CHUNK_SIZE_IN_BYTES, MAX_CHUNK_SIZE_IN_BYTES_DEFAULT, parameterOverrides, props); - this.checkAndUpdate( - MAX_CLIENT_LAG, - isIcebergMode ? MAX_CLIENT_LAG_ICEBERG_MODE_DEFAULT : MAX_CLIENT_LAG_DEFAULT, - parameterOverrides, - props, - false); + this.updateValue(MAX_CLIENT_LAG, MAX_CLIENT_LAG_DEFAULT, parameterOverrides, props); - this.checkAndUpdate( + this.updateValue( MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST, - isIcebergMode - ? MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT - : MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT, + MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT, parameterOverrides, - props, - isIcebergMode); + props); - this.checkAndUpdate( + this.updateValue( BDEC_PARQUET_COMPRESSION_ALGORITHM, BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT, parameterOverrides, - props, - false); + props); } /** @return Longest interval in milliseconds between buffer flushes */ diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java index 32a352ba9..4bdb695d8 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java @@ -27,21 +27,18 @@ public void testSerializationErrors() throws Exception { BlobBuilder.constructBlobAndMetadata( "a.bdec", Collections.singletonList(createChannelDataPerTable(1, false)), - Constants.BdecVersion.THREE, - true); + Constants.BdecVersion.THREE); BlobBuilder.constructBlobAndMetadata( "a.bdec", Collections.singletonList(createChannelDataPerTable(1, true)), - Constants.BdecVersion.THREE, - true); + Constants.BdecVersion.THREE); // Construction fails if metadata contains 0 rows and data 1 row try { BlobBuilder.constructBlobAndMetadata( "a.bdec", Collections.singletonList(createChannelDataPerTable(0, false)), - Constants.BdecVersion.THREE, - true); + Constants.BdecVersion.THREE); Assert.fail("Should not pass enableParquetInternalBuffering=false"); } catch (SFException e) { Assert.assertEquals(ErrorCode.INTERNAL_ERROR.getMessageCode(), e.getVendorCode()); @@ -59,8 +56,7 @@ public void testSerializationErrors() throws Exception { BlobBuilder.constructBlobAndMetadata( "a.bdec", Collections.singletonList(createChannelDataPerTable(0, true)), - Constants.BdecVersion.THREE, - true); + Constants.BdecVersion.THREE); Assert.fail("Should not pass enableParquetInternalBuffering=true"); } catch (SFException e) { Assert.assertEquals(ErrorCode.INTERNAL_ERROR.getMessageCode(), e.getVendorCode()); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java index 7c6d797b4..ecc8320a9 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ChannelCacheTest.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal; import static java.time.ZoneOffset.UTC; @@ -24,7 +28,7 @@ public class ChannelCacheTest { @Before public void setup() { cache = new ChannelCache<>(); - client = new SnowflakeStreamingIngestClientInternal<>("client", false); + client = new SnowflakeStreamingIngestClientInternal<>("client"); channel1 = new SnowflakeStreamingIngestChannelInternal<>( "channel1", diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index f59f9e076..c5949d570 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -11,7 +11,6 @@ import static net.snowflake.ingest.utils.Constants.BLOB_NO_HEADER; import static net.snowflake.ingest.utils.Constants.BLOB_TAG_SIZE_IN_BYTES; import static net.snowflake.ingest.utils.Constants.BLOB_VERSION_SIZE_IN_BYTES; -import static net.snowflake.ingest.utils.ParameterProvider.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT; import static net.snowflake.ingest.utils.ParameterProvider.MAX_CHUNK_SIZE_IN_BYTES_DEFAULT; import com.codahale.metrics.Histogram; @@ -53,22 +52,11 @@ import net.snowflake.ingest.utils.SFException; import org.junit.Assert; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatchers; import org.mockito.Mockito; -@RunWith(Parameterized.class) public class FlushServiceTest { - - @Parameterized.Parameters(name = "isIcebergMode: {0}") - public static Object[] isIcebergMode() { - return new Object[] {false, true}; - } - - @Parameterized.Parameter public static boolean isIcebergMode; - public FlushServiceTest() { this.testContextFactory = ParquetTestContext.createFactory(); } @@ -96,23 +84,16 @@ private abstract static class TestContext implements AutoCloseable { StorageManager storageManager; StreamingIngestStorage storage; ParameterProvider parameterProvider; - InternalParameterProvider internalParameterProvider; RegisterService registerService; final List> channelData = new ArrayList<>(); TestContext() { storage = Mockito.mock(StreamingIngestStorage.class); - parameterProvider = new ParameterProvider(isIcebergMode); - internalParameterProvider = new InternalParameterProvider(isIcebergMode); + parameterProvider = new ParameterProvider(); client = Mockito.mock(SnowflakeStreamingIngestClientInternal.class); Mockito.when(client.getParameterProvider()).thenReturn(parameterProvider); - Mockito.when(client.getInternalParameterProvider()).thenReturn(internalParameterProvider); - storageManager = - Mockito.spy( - isIcebergMode - ? new ExternalVolumeManager<>(true, "role", "client", null) - : new InternalStageManager<>(true, "role", "client", null)); + storageManager = Mockito.spy(new InternalStageManager<>(true, "role", "client", null)); Mockito.doReturn(storage).when(storageManager).getStorage(ArgumentMatchers.any()); Mockito.when(storageManager.getClientPrefix()).thenReturn("client_prefix"); channelCache = new ChannelCache<>(); @@ -421,38 +402,33 @@ public void testGetFilePath() { StorageManager storageManager = testContext.storageManager; Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC")); String clientPrefix = "honk"; - if (isIcebergMode) { - // TODO: SNOW-1502887 Blob path generation for iceberg table - String outputString = storageManager.generateBlobPath(); - } else { - String outputString = - ((InternalStageManager) storageManager).getBlobPath(calendar, clientPrefix); - Path outputPath = Paths.get(outputString); - Assert.assertTrue(outputPath.getFileName().toString().contains(clientPrefix)); - Assert.assertTrue( - calendar.get(Calendar.MINUTE) - - Integer.parseInt(outputPath.getParent().getFileName().toString()) - <= 1); - Assert.assertEquals( - Integer.toString(calendar.get(Calendar.HOUR_OF_DAY)), - outputPath.getParent().getParent().getFileName().toString()); - Assert.assertEquals( - Integer.toString(calendar.get(Calendar.DAY_OF_MONTH)), - outputPath.getParent().getParent().getParent().getFileName().toString()); - Assert.assertEquals( - Integer.toString(calendar.get(Calendar.MONTH) + 1), - outputPath.getParent().getParent().getParent().getParent().getFileName().toString()); - Assert.assertEquals( - Integer.toString(calendar.get(Calendar.YEAR)), - outputPath - .getParent() - .getParent() - .getParent() - .getParent() - .getParent() - .getFileName() - .toString()); - } + String outputString = + ((InternalStageManager) storageManager).getBlobPath(calendar, clientPrefix); + Path outputPath = Paths.get(outputString); + Assert.assertTrue(outputPath.getFileName().toString().contains(clientPrefix)); + Assert.assertTrue( + calendar.get(Calendar.MINUTE) + - Integer.parseInt(outputPath.getParent().getFileName().toString()) + <= 1); + Assert.assertEquals( + Integer.toString(calendar.get(Calendar.HOUR_OF_DAY)), + outputPath.getParent().getParent().getFileName().toString()); + Assert.assertEquals( + Integer.toString(calendar.get(Calendar.DAY_OF_MONTH)), + outputPath.getParent().getParent().getParent().getFileName().toString()); + Assert.assertEquals( + Integer.toString(calendar.get(Calendar.MONTH) + 1), + outputPath.getParent().getParent().getParent().getParent().getFileName().toString()); + Assert.assertEquals( + Integer.toString(calendar.get(Calendar.YEAR)), + outputPath + .getParent() + .getParent() + .getParent() + .getParent() + .getParent() + .getFileName() + .toString()); } @Test @@ -624,9 +600,7 @@ public void runTestBlobSplitDueToNumberOfChunks(int numberOfRows) throws Excepti Math.ceil( (double) numberOfRows / channelsPerTable - / (isIcebergMode - ? MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT - : ParameterProvider.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT)); + / ParameterProvider.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT); final TestContext>> testContext = testContextFactory.create(); @@ -904,7 +878,7 @@ public void testInvalidateChannels() { // Create a new Client in order to not interfere with other tests SnowflakeStreamingIngestClientInternal client = Mockito.mock(SnowflakeStreamingIngestClientInternal.class); - ParameterProvider parameterProvider = new ParameterProvider(isIcebergMode); + ParameterProvider parameterProvider = new ParameterProvider(); ChannelCache channelCache = new ChannelCache<>(); Mockito.when(client.getChannelCache()).thenReturn(channelCache); Mockito.when(client.getParameterProvider()).thenReturn(parameterProvider); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/InsertRowsBenchmarkTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/InsertRowsBenchmarkTest.java index d28b22669..51f53f660 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/InsertRowsBenchmarkTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/InsertRowsBenchmarkTest.java @@ -40,7 +40,7 @@ public class InsertRowsBenchmarkTest { @Setup(Level.Trial) public void setUpBeforeAll() { - client = new SnowflakeStreamingIngestClientInternal("client_PARQUET", false); + client = new SnowflakeStreamingIngestClientInternal("client_PARQUET"); channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/InternalParameterProviderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/InternalParameterProviderTest.java deleted file mode 100644 index e716b3ddb..000000000 --- a/src/test/java/net/snowflake/ingest/streaming/internal/InternalParameterProviderTest.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright (c) 2024. Snowflake Computing Inc. All rights reserved. - */ - -package net.snowflake.ingest.streaming.internal; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class InternalParameterProviderTest { - @Parameterized.Parameters(name = "isIcebergMode: {0}") - public static Object[] isIcebergMode() { - return new Object[] {false, true}; - } - - @Parameterized.Parameter public static boolean isIcebergMode; - - @Test - public void testConstantParameterProvider() { - InternalParameterProvider internalParameterProvider = - new InternalParameterProvider(isIcebergMode); - assert internalParameterProvider.getEnableChunkEncryption() == !isIcebergMode; - } -} diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/OAuthBasicTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/OAuthBasicTest.java index 0c0c5bb85..c7bf5d07d 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/OAuthBasicTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/OAuthBasicTest.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal; import java.util.Properties; @@ -114,7 +118,7 @@ public void testCreateOAuthClient() throws Exception { @Test public void testSetRefreshToken() throws Exception { SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("TEST_CLIENT", false); + new SnowflakeStreamingIngestClientInternal<>("TEST_CLIENT"); MockOAuthClient mockOAuthClient = new MockOAuthClient(); OAuthManager oAuthManager = diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java index c4631b348..632df4090 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java @@ -1,6 +1,8 @@ -package net.snowflake.ingest.streaming.internal; +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ -import static net.snowflake.ingest.utils.ParameterProvider.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT; +package net.snowflake.ingest.streaming.internal; import java.util.Arrays; import java.util.HashMap; @@ -8,9 +10,7 @@ import java.util.Map; import java.util.Properties; import net.snowflake.ingest.utils.Constants; -import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.ParameterProvider; -import net.snowflake.ingest.utils.SFException; import org.junit.Assert; import org.junit.Test; @@ -35,7 +35,7 @@ private Map getStartingParameterMap() { public void withValuesSet() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); Assert.assertEquals(1000L, parameterProvider.getCachedMaxClientLagInMs()); Assert.assertEquals(4L, parameterProvider.getBufferFlushCheckIntervalInMs()); @@ -58,7 +58,7 @@ public void withNullProps() { parameterMap.put(ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS, 4L); parameterMap.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE, 6); parameterMap.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_BYTES, 1024); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, null, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, null); Assert.assertEquals(3000, parameterProvider.getCachedMaxClientLagInMs()); Assert.assertEquals(4, parameterProvider.getBufferFlushCheckIntervalInMs()); @@ -76,7 +76,7 @@ public void withNullParameterMap() { props.put(ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS, 4L); props.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE, 6); props.put(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_BYTES, 1024); - ParameterProvider parameterProvider = new ParameterProvider(null, props, false); + ParameterProvider parameterProvider = new ParameterProvider(null, props); Assert.assertEquals(3000, parameterProvider.getCachedMaxClientLagInMs()); Assert.assertEquals(4, parameterProvider.getBufferFlushCheckIntervalInMs()); @@ -89,7 +89,7 @@ public void withNullParameterMap() { @Test public void withNullInputs() { - ParameterProvider parameterProvider = new ParameterProvider(null, null, false); + ParameterProvider parameterProvider = new ParameterProvider(null, null); Assert.assertEquals( ParameterProvider.MAX_CLIENT_LAG_DEFAULT, parameterProvider.getCachedMaxClientLagInMs()); @@ -109,7 +109,7 @@ public void withNullInputs() { @Test public void withDefaultValues() { - ParameterProvider parameterProvider = new ParameterProvider(false); + ParameterProvider parameterProvider = new ParameterProvider(); Assert.assertEquals( ParameterProvider.MAX_CLIENT_LAG_DEFAULT, parameterProvider.getCachedMaxClientLagInMs()); @@ -141,50 +141,12 @@ public void withDefaultValues() { parameterProvider.getBdecParquetCompressionAlgorithm()); } - @Test - public void withIcebergDefaultValues() { - ParameterProvider parameterProvider = new ParameterProvider(true); - - Assert.assertEquals( - ParameterProvider.MAX_CLIENT_LAG_ICEBERG_MODE_DEFAULT, - parameterProvider.getCachedMaxClientLagInMs()); - Assert.assertEquals( - ParameterProvider.BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS_DEFAULT, - parameterProvider.getBufferFlushCheckIntervalInMs()); - Assert.assertEquals( - ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE_DEFAULT, - parameterProvider.getInsertThrottleThresholdInPercentage()); - Assert.assertEquals( - ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_BYTES_DEFAULT, - parameterProvider.getInsertThrottleThresholdInBytes()); - Assert.assertEquals( - ParameterProvider.INSERT_THROTTLE_INTERVAL_IN_MILLIS_DEFAULT, - parameterProvider.getInsertThrottleIntervalInMs()); - Assert.assertEquals( - ParameterProvider.IO_TIME_CPU_RATIO_DEFAULT, parameterProvider.getIOTimeCpuRatio()); - Assert.assertEquals( - ParameterProvider.BLOB_UPLOAD_MAX_RETRY_COUNT_DEFAULT, - parameterProvider.getBlobUploadMaxRetryCount()); - Assert.assertEquals( - ParameterProvider.MAX_MEMORY_LIMIT_IN_BYTES_DEFAULT, - parameterProvider.getMaxMemoryLimitInBytes()); - Assert.assertEquals( - ParameterProvider.MAX_CHANNEL_SIZE_IN_BYTES_DEFAULT, - parameterProvider.getMaxChannelSizeInBytes()); - Assert.assertEquals( - ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT, - parameterProvider.getBdecParquetCompressionAlgorithm()); - Assert.assertEquals( - MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_ICEBERG_MODE_DEFAULT, - parameterProvider.getMaxChunksInBlobAndRegistrationRequest()); - } - @Test public void testMaxClientLagEnabled() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "2 second"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); Assert.assertEquals(2000, parameterProvider.getCachedMaxClientLagInMs()); // call again to trigger caching logic Assert.assertEquals(2000, parameterProvider.getCachedMaxClientLagInMs()); @@ -195,7 +157,7 @@ public void testMaxClientLagEnabledPluralTimeUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "2 seconds"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); Assert.assertEquals(2000, parameterProvider.getCachedMaxClientLagInMs()); } @@ -204,7 +166,7 @@ public void testMaxClientLagEnabledMinuteTimeUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "1 minute"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); Assert.assertEquals(60000, parameterProvider.getCachedMaxClientLagInMs()); } @@ -213,7 +175,7 @@ public void testMaxClientLagEnabledMinuteTimeUnitPluralTimeUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "2 minutes"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); Assert.assertEquals(120000, parameterProvider.getCachedMaxClientLagInMs()); } @@ -221,7 +183,7 @@ public void testMaxClientLagEnabledMinuteTimeUnitPluralTimeUnit() { public void testMaxClientLagEnabledDefaultValue() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); Assert.assertEquals( ParameterProvider.MAX_CLIENT_LAG_DEFAULT, parameterProvider.getCachedMaxClientLagInMs()); } @@ -231,7 +193,7 @@ public void testMaxClientLagEnabledDefaultUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "3000"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); Assert.assertEquals(3000, parameterProvider.getCachedMaxClientLagInMs()); } @@ -240,7 +202,7 @@ public void testMaxClientLagEnabledLongInput() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, 3000L); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); Assert.assertEquals(3000, parameterProvider.getCachedMaxClientLagInMs()); } @@ -249,7 +211,7 @@ public void testMaxClientLagEnabledMissingUnitTimeUnitSupplied() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, " year"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -263,7 +225,7 @@ public void testMaxClientLagEnabledInvalidTimeUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "1 year"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -277,7 +239,7 @@ public void testMaxClientLagEnabledInvalidUnit() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "banana minute"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -291,7 +253,7 @@ public void testMaxClientLagEnabledThresholdBelow() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "0 second"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -305,7 +267,7 @@ public void testMaxClientLagEnabledThresholdAbove() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, "11 minutes"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -319,7 +281,7 @@ public void testMaxClientLagEnableEmptyInput() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.MAX_CLIENT_LAG, ""); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); try { parameterProvider.getCachedMaxClientLagInMs(); Assert.fail("Should not have succeeded"); @@ -333,20 +295,8 @@ public void testMaxChunksInBlobAndRegistrationRequest() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put("max_chunks_in_blob_and_registration_request", 1); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); - Assert.assertEquals(1, parameterProvider.getMaxChunksInBlobAndRegistrationRequest()); - - parameterProvider = new ParameterProvider(parameterMap, prop, true); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); Assert.assertEquals(1, parameterProvider.getMaxChunksInBlobAndRegistrationRequest()); - - SFException e = - Assert.assertThrows( - SFException.class, - () -> { - parameterMap.put("max_chunks_in_blob_and_registration_request", 100); - new ParameterProvider(parameterMap, prop, true); - }); - Assert.assertEquals(e.getVendorCode(), ErrorCode.INVALID_CONFIG_PARAMETER.getMessageCode()); } @Test @@ -357,7 +307,7 @@ public void testValidCompressionAlgorithmsAndWithUppercaseLowerCase() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM, v); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); Assert.assertEquals( Constants.BdecParquetCompression.GZIP, parameterProvider.getBdecParquetCompressionAlgorithm()); @@ -368,7 +318,7 @@ public void testValidCompressionAlgorithmsAndWithUppercaseLowerCase() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM, v); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); Assert.assertEquals( Constants.BdecParquetCompression.ZSTD, parameterProvider.getBdecParquetCompressionAlgorithm()); @@ -380,7 +330,7 @@ public void testInvalidCompressionAlgorithm() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); parameterMap.put(ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM, "invalid_comp"); - ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop, false); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); try { parameterProvider.getBdecParquetCompressionAlgorithm(); Assert.fail("Should not have succeeded"); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RegisterServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RegisterServiceTest.java index 4eaea15a4..a9fa38622 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RegisterServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RegisterServiceTest.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal; import static net.snowflake.ingest.utils.Constants.BLOB_UPLOAD_TIMEOUT_IN_SEC; @@ -14,17 +18,8 @@ import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -@RunWith(Parameterized.class) public class RegisterServiceTest { - @Parameterized.Parameters(name = "isIcebergMode: {0}") - public static Object[] isIcebergMode() { - return new Object[] {false, true}; - } - - @Parameterized.Parameter public boolean isIcebergMode; @Test public void testRegisterService() throws ExecutionException, InterruptedException { @@ -54,7 +49,7 @@ public void testRegisterService() throws ExecutionException, InterruptedExceptio @Test public void testRegisterServiceTimeoutException() throws Exception { SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); + new SnowflakeStreamingIngestClientInternal<>("client"); RegisterService rs = new RegisterService<>(client, true); Pair, CompletableFuture> blobFuture1 = @@ -82,7 +77,7 @@ public void testRegisterServiceTimeoutException() throws Exception { @Test public void testRegisterServiceTimeoutException_testRetries() throws Exception { SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); + new SnowflakeStreamingIngestClientInternal<>("client"); RegisterService rs = new RegisterService<>(client, true); Pair, CompletableFuture> blobFuture1 = @@ -116,7 +111,7 @@ public void testRegisterServiceTimeoutException_testRetries() throws Exception { @Test public void testRegisterServiceNonTimeoutException() { SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); + new SnowflakeStreamingIngestClientInternal<>("client"); RegisterService rs = new RegisterService<>(client, true); CompletableFuture future = new CompletableFuture<>(); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java index dc3285df8..5d8d8d36a 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java @@ -48,11 +48,8 @@ import net.snowflake.ingest.utils.Utils; import org.junit.Assert; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; import org.mockito.Mockito; -@RunWith(Parameterized.class) public class SnowflakeStreamingIngestChannelTest { /** @@ -74,13 +71,6 @@ public long getFreeMemory() { } } - @Parameterized.Parameters(name = "isIcebergMode: {0}") - public static Object[] isIcebergMode() { - return new Object[] {false, true}; - } - - @Parameterized.Parameter public boolean isIcebergMode; - @Test public void testChannelFactoryNullFields() { String name = "CHANNEL"; @@ -90,7 +80,7 @@ public void testChannelFactoryNullFields() { long channelSequencer = 0L; long rowSequencer = 0L; SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); + new SnowflakeStreamingIngestClientInternal<>("client"); Object[] fields = new Object[] { @@ -132,7 +122,7 @@ public void testChannelFactorySuccess() { long rowSequencer = 0L; SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); + new SnowflakeStreamingIngestClientInternal<>("client"); SnowflakeStreamingIngestChannelInternal channel = SnowflakeStreamingIngestChannelFactory.builder(name) @@ -167,7 +157,7 @@ public void testChannelFactorySuccess() { @Test public void testChannelValid() { SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); + new SnowflakeStreamingIngestClientInternal<>("client"); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -217,7 +207,7 @@ public void testChannelValid() { @Test public void testChannelClose() { SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); + new SnowflakeStreamingIngestClientInternal<>("client"); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -361,7 +351,6 @@ public void testOpenChannelErrorResponse() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, - isIcebergMode, true, requestBuilder, null); @@ -432,7 +421,6 @@ public void testOpenChannelSnowflakeInternalErrorResponse() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, - isIcebergMode, true, requestBuilder, null); @@ -455,10 +443,6 @@ public void testOpenChannelSnowflakeInternalErrorResponse() throws Exception { @Test public void testOpenChannelSuccessResponse() throws Exception { - // TODO: SNOW-1490151 Iceberg testing gaps - if (isIcebergMode) { - return; - } String name = "CHANNEL"; String dbName = "STREAMINGINGEST_TEST"; String schemaName = "PUBLIC"; @@ -519,7 +503,6 @@ public void testOpenChannelSuccessResponse() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, - isIcebergMode, true, requestBuilder, null); @@ -558,9 +541,7 @@ public void testOpenChannelSuccessResponse() throws Exception { @Test public void testInsertRow() { SnowflakeStreamingIngestClientInternal client; - client = - new SnowflakeStreamingIngestClientInternal( - "client_PARQUET", isIcebergMode); + client = new SnowflakeStreamingIngestClientInternal("client_PARQUET"); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -644,8 +625,7 @@ public void testInsertTooLargeRow() { schema.forEach(x -> row.put(x.getName(), byteArrayOneMb)); SnowflakeStreamingIngestClientInternal client; - client = - new SnowflakeStreamingIngestClientInternal("test_client", isIcebergMode); + client = new SnowflakeStreamingIngestClientInternal("test_client"); // Test channel with on error CONTINUE SnowflakeStreamingIngestChannelInternal channel = @@ -729,7 +709,7 @@ public void testInsertRowThrottling() { memoryInfoProvider.maxMemory = maxMemory; SnowflakeStreamingIngestClientInternal client = - new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode); + new SnowflakeStreamingIngestClientInternal<>("client"); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -745,7 +725,7 @@ public void testInsertRowThrottling() { OpenChannelRequest.OnErrorOption.CONTINUE, UTC); - ParameterProvider parameterProvider = new ParameterProvider(isIcebergMode); + ParameterProvider parameterProvider = new ParameterProvider(); memoryInfoProvider.freeMemory = maxMemory * (parameterProvider.getInsertThrottleThresholdInPercentage() - 1) / 100; @@ -775,7 +755,7 @@ public void testInsertRowThrottling() { @Test public void testFlush() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -811,7 +791,7 @@ public void testFlush() throws Exception { @Test public void testClose() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); SnowflakeStreamingIngestChannel channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -845,7 +825,7 @@ public void testClose() throws Exception { @Test public void testDropOnClose() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -882,7 +862,7 @@ public void testDropOnClose() throws Exception { @Test public void testDropOnCloseInvalidChannel() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", @@ -915,7 +895,7 @@ public void testDropOnCloseInvalidChannel() throws Exception { public void testGetLatestCommittedOffsetToken() { String offsetToken = "10"; SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); SnowflakeStreamingIngestChannel channel = new SnowflakeStreamingIngestChannelInternal<>( "channel", diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java index 5375adbec..23970e22b 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java @@ -70,12 +70,9 @@ import org.junit.Before; import org.junit.Ignore; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; import org.mockito.ArgumentMatchers; import org.mockito.Mockito; -@RunWith(Parameterized.class) public class SnowflakeStreamingIngestClientTest { private static final ObjectMapper objectMapper = new ObjectMapper(); @@ -86,14 +83,6 @@ public class SnowflakeStreamingIngestClientTest { SnowflakeStreamingIngestChannelInternal channel3; SnowflakeStreamingIngestChannelInternal channel4; - // TODO: Add IcebergMode = True after Streaming to Iceberg is supported. - @Parameterized.Parameters(name = "isIcebergMode: {0}") - public static Object[] isIcebergMode() { - return new Object[] {false}; - } - - @Parameterized.Parameter public boolean isIcebergMode; - @Before public void setup() throws Exception { objectMapper.setVisibility(PropertyAccessor.GETTER, JsonAutoDetect.Visibility.ANY); @@ -113,7 +102,6 @@ public void setup() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, - isIcebergMode, true, requestBuilder, null); @@ -200,7 +188,6 @@ public void testConstructorParameters() throws Exception { SnowflakeStreamingIngestClientFactory.builder("client") .setProperties(prop) .setParameterOverrides(parameterMap) - .setIsIceberg(isIcebergMode) .setIsTestMode(true) .build(); @@ -228,7 +215,6 @@ public void testClientFactoryWithJmxMetrics() throws Exception { .setProperties(prop) .setParameterOverrides( Collections.singletonMap(ENABLE_SNOWPIPE_STREAMING_METRICS, true)) - .setIsIceberg(isIcebergMode) .build(); Assert.assertEquals("client", client.getName()); @@ -380,7 +366,6 @@ public void testGetChannelsStatusWithRequest() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, - isIcebergMode, true, requestBuilder, null); @@ -440,7 +425,6 @@ public void testDropChannel() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, - isIcebergMode, true, requestBuilder, null); @@ -486,7 +470,6 @@ public void testGetChannelsStatusWithRequestError() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, - isIcebergMode, true, requestBuilder, null); @@ -540,7 +523,6 @@ public void testRegisterBlobRequestCreationSuccess() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, - isIcebergMode, true, requestBuilder, null); @@ -732,7 +714,6 @@ public void testGetRetryBlobs() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, - isIcebergMode, true, requestBuilder, null); @@ -774,7 +755,6 @@ public void testRegisterBlobErrorResponse() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, - isIcebergMode, true, requestBuilder, null); @@ -822,7 +802,6 @@ public void testRegisterBlobSnowflakeInternalErrorResponse() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, - isIcebergMode, true, requestBuilder, null); @@ -879,7 +858,6 @@ public void testRegisterBlobSuccessResponse() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, - isIcebergMode, true, requestBuilder, null); @@ -964,7 +942,6 @@ public void testRegisterBlobsRetries() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, - isIcebergMode, true, requestBuilder, null); @@ -994,7 +971,6 @@ public void testRegisterBlobChunkLimit() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, - isIcebergMode, true, requestBuilder, null)); @@ -1167,7 +1143,6 @@ public void testRegisterBlobsRetriesSucceeds() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, - isIcebergMode, true, requestBuilder, null); @@ -1242,7 +1217,6 @@ public void testRegisterBlobResponseWithInvalidChannel() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, - isIcebergMode, true, requestBuilder, null); @@ -1293,7 +1267,7 @@ public void testRegisterBlobResponseWithInvalidChannel() throws Exception { @Test public void testFlush() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); ChannelsStatusResponse response = new ChannelsStatusResponse(); response.setStatusCode(0L); response.setMessage("Success"); @@ -1315,7 +1289,7 @@ public void testFlush() throws Exception { @Test public void testClose() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); ChannelsStatusResponse response = new ChannelsStatusResponse(); response.setStatusCode(0L); response.setMessage("Success"); @@ -1349,7 +1323,7 @@ public void testClose() throws Exception { @Test public void testCloseWithError() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); CompletableFuture future = new CompletableFuture<>(); future.completeExceptionally(new Exception("Simulating Error")); @@ -1387,7 +1361,7 @@ public void testCloseWithError() throws Exception { @Test public void testVerifyChannelsAreFullyCommittedSuccess() throws Exception { SnowflakeStreamingIngestClientInternal client = - Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode)); + Mockito.spy(new SnowflakeStreamingIngestClientInternal<>("client")); SnowflakeStreamingIngestChannelInternal channel = new SnowflakeStreamingIngestChannelInternal<>( "channel1", @@ -1435,7 +1409,6 @@ public void testFlushServiceException() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, - isIcebergMode, true, requestBuilder, parameterMap); @@ -1472,7 +1445,6 @@ public void testGetLatestCommittedOffsetTokens() throws Exception { new SnowflakeURL("snowflake.dev.local:8082"), null, httpClient, - isIcebergMode, true, requestBuilder, null); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorageTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorageTest.java index 478934f4b..f320c99aa 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorageTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestStorageTest.java @@ -48,7 +48,6 @@ import net.snowflake.client.jdbc.internal.google.common.util.concurrent.ThreadFactoryBuilder; import net.snowflake.ingest.TestUtils; import net.snowflake.ingest.connection.RequestBuilder; -import net.snowflake.ingest.utils.ParameterProvider; import net.snowflake.ingest.utils.SFException; import org.junit.Assert; import org.junit.Test; @@ -286,7 +285,6 @@ public void testRefreshSnowflakeMetadataRemote() throws Exception { StorageManager storageManager = new InternalStageManager(true, "role", "client", snowflakeServiceClient); - ParameterProvider parameterProvider = new ParameterProvider(false); StreamingIngestStorage stage = new StreamingIngestStorage( storageManager,