From ff8f7661f586eeb2b043e6dd1cf91ca085e02f8a Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Thu, 5 Sep 2024 11:14:09 -0700 Subject: [PATCH] Address comments --- .../internal/ClientBufferParameters.java | 25 +- .../streaming/internal/ColumnMetadata.java | 6 + .../internal/DataValidationUtil.java | 24 +- .../internal/OpenChannelRequestInternal.java | 4 +- .../streaming/internal/ParquetColumn.java | 20 ++ .../streaming/internal/ParquetRowBuffer.java | 28 +-- .../internal/ParquetTypeGenerator.java | 17 +- .../streaming/internal/ParquetTypeInfo.java | 22 +- ...nowflakeStreamingIngestClientInternal.java | 93 ++++---- .../ingest/utils/IcebergDataTypeParser.java | 46 +++- .../ingest/utils/ParameterProvider.java | 2 + .../parquet/hadoop/BdecParquetWriter.java | 5 +- .../internal/IcebergDataTypeParserTest.java | 107 +++++++++ .../streaming/internal/RowBufferTest.java | 218 ++++++++++++++---- .../internal/SnowflakeServiceClientTest.java | 1 + .../SnowflakeStreamingIngestChannelTest.java | 3 + 16 files changed, 473 insertions(+), 148 deletions(-) create mode 100644 src/main/java/net/snowflake/ingest/streaming/internal/ParquetColumn.java create mode 100644 src/test/java/net/snowflake/ingest/streaming/internal/IcebergDataTypeParserTest.java diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java b/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java index ac05c814e..7a4eab274 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2023-2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.streaming.internal; @@ -20,6 +20,8 @@ public class ClientBufferParameters { private Constants.BdecParquetCompression bdecParquetCompression; + private boolean isIcebergMode; + /** * Private constructor used for test methods * @@ -27,18 +29,21 @@ public class ClientBufferParameters { * enabled * @param maxChunkSizeInBytes maximum chunk size in bytes * @param maxAllowedRowSizeInBytes maximum row size in bytes + * @param isIcebergMode */ private ClientBufferParameters( boolean enableParquetInternalBuffering, long maxChunkSizeInBytes, long maxAllowedRowSizeInBytes, Constants.BdecParquetCompression bdecParquetCompression, - boolean enableNewJsonParsingLogic) { + boolean enableNewJsonParsingLogic, + boolean isIcebergMode) { this.enableParquetInternalBuffering = enableParquetInternalBuffering; this.maxChunkSizeInBytes = maxChunkSizeInBytes; this.maxAllowedRowSizeInBytes = maxAllowedRowSizeInBytes; this.bdecParquetCompression = bdecParquetCompression; this.enableNewJsonParsingLogic = enableNewJsonParsingLogic; + this.isIcebergMode = isIcebergMode; } /** @param clientInternal reference to the client object where the relevant parameters are set */ @@ -59,11 +64,14 @@ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInter clientInternal != null ? clientInternal.getParameterProvider().getBdecParquetCompressionAlgorithm() : ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT; - this.enableNewJsonParsingLogic = clientInternal != null ? clientInternal.getParameterProvider().isEnableNewJsonParsingLogic() : ParameterProvider.ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT; + this.isIcebergMode = + clientInternal != null + ? clientInternal.isIcebergMode() + : ParameterProvider.IS_ICEBERG_MODE_DEFAULT; } /** @@ -71,6 +79,7 @@ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInter * enabled * @param maxChunkSizeInBytes maximum chunk size in bytes * @param maxAllowedRowSizeInBytes maximum row size in bytes + * @param isIcebergMode * @return ClientBufferParameters object */ public static ClientBufferParameters test_createClientBufferParameters( @@ -78,13 +87,15 @@ public static ClientBufferParameters test_createClientBufferParameters( long maxChunkSizeInBytes, long maxAllowedRowSizeInBytes, Constants.BdecParquetCompression bdecParquetCompression, - boolean enableNewJsonParsingLogic) { + boolean enableNewJsonParsingLogic, + boolean isIcebergMode) { return new ClientBufferParameters( enableParquetInternalBuffering, maxChunkSizeInBytes, maxAllowedRowSizeInBytes, bdecParquetCompression, - enableNewJsonParsingLogic); + enableNewJsonParsingLogic, + isIcebergMode); } public boolean getEnableParquetInternalBuffering() { @@ -106,4 +117,8 @@ public Constants.BdecParquetCompression getBdecParquetCompression() { public boolean isEnableNewJsonParsingLogic() { return enableNewJsonParsingLogic; } + + public boolean getIsIcebergMode() { + return isIcebergMode; + } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ColumnMetadata.java b/src/main/java/net/snowflake/ingest/streaming/internal/ColumnMetadata.java index 6f791a6c3..1231247b5 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ColumnMetadata.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ColumnMetadata.java @@ -21,6 +21,12 @@ class ColumnMetadata { private Integer length; private boolean nullable; private String collation; + + /** + * The Json serialization of Iceberg data type of the column, see JSON serialization + * for more details. + */ private String sourceIcebergDataType; /** diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java b/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java index 15e2e1ee4..55b14c57f 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java @@ -972,7 +972,17 @@ static double validateAndParseReal(String columnName, Object input, long insertR */ static int validateAndParseIcebergInt(String columnName, Object input, long insertRowIndex) { if (input instanceof Number) { - return ((Number) input).intValue(); + double value = ((Number) input).doubleValue(); + long longValue = Math.round(value); + if (longValue > Integer.MAX_VALUE || longValue < Integer.MIN_VALUE || Double.isNaN(value)) { + throw new SFException( + ErrorCode.INVALID_VALUE_ROW, + String.format( + "Number out of representable inclusive range of integers between %d and %d," + + " rowIndex:%d", + Integer.MIN_VALUE, Integer.MAX_VALUE, insertRowIndex)); + } + return (int) longValue; } else if (input instanceof String) { try { return Integer.parseInt(((String) input).trim()); @@ -1000,7 +1010,17 @@ static int validateAndParseIcebergInt(String columnName, Object input, long inse */ static long validateAndParseIcebergLong(String columnName, Object input, long insertRowIndex) { if (input instanceof Number) { - return ((Number) input).longValue(); + double value = ((Number) input).doubleValue(); + double roundedDouble = (value > 0) ? Math.floor(value + 0.5) : Math.ceil(value - 0.5); + if (roundedDouble > Long.MAX_VALUE || roundedDouble < Long.MIN_VALUE || Double.isNaN(value)) { + throw new SFException( + ErrorCode.INVALID_VALUE_ROW, + String.format( + "Number out of representable inclusive range of longs between %d and %d," + + " rowIndex:%d", + Long.MIN_VALUE, Long.MAX_VALUE, insertRowIndex)); + } + return (long) roundedDouble; } else if (input instanceof String) { try { return Long.parseLong(((String) input).trim()); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelRequestInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelRequestInternal.java index bd66b5fd6..c3c427c68 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelRequestInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelRequestInternal.java @@ -101,7 +101,7 @@ String getOffsetToken() { public String getStringForLogging() { return String.format( "OpenChannelRequestInternal(requestId=%s, role=%s, db=%s, schema=%s, table=%s, channel=%s," - + " writeMode=%s)", - requestId, role, database, schema, table, channel, writeMode); + + " writeMode=%s, isIceberg=%s)", + requestId, role, database, schema, table, channel, writeMode, isIceberg); } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetColumn.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetColumn.java new file mode 100644 index 000000000..68ed7a8c9 --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetColumn.java @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal; + +import org.apache.parquet.schema.Type; + +/** Represents a column in a Parquet file. */ +class ParquetColumn { + final ColumnMetadata columnMetadata; + final int index; + final Type type; + + ParquetColumn(ColumnMetadata columnMetadata, int index, Type type) { + this.columnMetadata = columnMetadata; + this.index = index; + this.type = type; + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java index d23d40a39..352c92f0b 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java @@ -15,6 +15,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.function.Consumer; @@ -204,16 +205,17 @@ private float addRow( forkedStatsMap.put(columnName, forkedStats); ColumnMetadata column = parquetColumn.columnMetadata; ParquetBufferValue valueWithSize = - (column.getSourceIcebergDataType() == null - ? SnowflakeParquetValueParser.parseColumnValueToParquet( + (clientBufferParameters.getIsIcebergMode() + ? IcebergParquetValueParser.parseColumnValueToParquet( + value, parquetColumn.type, forkedStats, defaultTimezone, insertRowsCurrIndex) + : SnowflakeParquetValueParser.parseColumnValueToParquet( value, column, parquetColumn.type.asPrimitiveType().getPrimitiveTypeName(), forkedStats, defaultTimezone, - insertRowsCurrIndex) - : IcebergParquetValueParser.parseColumnValueToParquet( - value, parquetColumn.type, forkedStats, defaultTimezone, insertRowsCurrIndex)); + insertRowsCurrIndex, + clientBufferParameters.isEnableNewJsonParsingLogic())); indexedRow[colIndex] = valueWithSize.getValue(); size += valueWithSize.getSize(); } @@ -304,6 +306,10 @@ Object getVectorValueAt(String column, int index) { if (logicalType == ColumnLogicalType.BINARY && value != null) { value = value instanceof String ? ((String) value).getBytes(StandardCharsets.UTF_8) : value; } + /* Mismatch between Iceberg string & FDN String */ + if (Objects.equals(columnMetadata.getSourceIcebergDataType(), "\"string\"")) { + value = value instanceof byte[] ? new String((byte[]) value, StandardCharsets.UTF_8) : value; + } return value; } @@ -339,16 +345,4 @@ public Flusher createFlusher() { clientBufferParameters.getMaxChunkSizeInBytes(), clientBufferParameters.getBdecParquetCompression()); } - - private static class ParquetColumn { - final ColumnMetadata columnMetadata; - final int index; - final Type type; - - private ParquetColumn(ColumnMetadata columnMetadata, int index, Type type) { - this.columnMetadata = columnMetadata; - this.index = index; - this.type = type; - } - } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetTypeGenerator.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetTypeGenerator.java index d843b2bbb..d4b70f397 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetTypeGenerator.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetTypeGenerator.java @@ -4,16 +4,14 @@ package net.snowflake.ingest.streaming.internal; -import static net.snowflake.ingest.utils.IcebergDataTypeParser.deserializeIcebergType; - import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; import net.snowflake.ingest.utils.ErrorCode; +import net.snowflake.ingest.utils.IcebergDataTypeParser; import net.snowflake.ingest.utils.SFException; -import org.apache.iceberg.parquet.TypeToMessageType; import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; @@ -33,9 +31,6 @@ public class ParquetTypeGenerator { AbstractRowBuffer.ColumnPhysicalType.SB8, AbstractRowBuffer.ColumnPhysicalType.SB16)); - /** Util class that contains the mapping between Iceberg data type and Parquet data type */ - private static final TypeToMessageType typeToMessageType = new TypeToMessageType(); - /** * Generate the column parquet type and metadata from the column metadata received from server * side. @@ -46,7 +41,6 @@ public class ParquetTypeGenerator { */ static ParquetTypeInfo generateColumnParquetTypeInfo(ColumnMetadata column, int id) { id = column.getOrdinal() == null ? id : column.getOrdinal(); - ParquetTypeInfo res = new ParquetTypeInfo(); Type parquetType; Map metadata = new HashMap<>(); String name = column.getInternalName(); @@ -60,10 +54,9 @@ static ParquetTypeInfo generateColumnParquetTypeInfo(ColumnMetadata column, int column.getNullable() ? Type.Repetition.OPTIONAL : Type.Repetition.REQUIRED; if (column.getSourceIcebergDataType() != null) { - org.apache.iceberg.types.Type icebergDataType = - deserializeIcebergType(column.getSourceIcebergDataType()); parquetType = - typeToMessageType.primitive(icebergDataType.asPrimitiveType(), repetition, id, name); + IcebergDataTypeParser.parseIcebergDataTypeStringToParquetType( + column.getSourceIcebergDataType(), repetition, id, name); } else { AbstractRowBuffer.ColumnPhysicalType physicalType; AbstractRowBuffer.ColumnLogicalType logicalType; @@ -147,9 +140,7 @@ static ParquetTypeInfo generateColumnParquetTypeInfo(ColumnMetadata column, int ErrorCode.UNKNOWN_DATA_TYPE, column.getLogicalType(), column.getPhysicalType()); } } - res.setParquetType(parquetType); - res.setMetadata(metadata); - return res; + return new ParquetTypeInfo(parquetType, metadata); } /** diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetTypeInfo.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetTypeInfo.java index 54badadb7..9e98bbcc9 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetTypeInfo.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetTypeInfo.java @@ -5,7 +5,6 @@ package net.snowflake.ingest.streaming.internal; import java.util.Map; -import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; /** @@ -13,8 +12,13 @@ * server side scanner */ class ParquetTypeInfo { - private Type parquetType; - private Map metadata; + private final Type parquetType; + private final Map metadata; + + ParquetTypeInfo(Type parquetType, Map metadata) { + this.parquetType = parquetType; + this.metadata = metadata; + } public Type getParquetType() { return this.parquetType; @@ -23,16 +27,4 @@ public Type getParquetType() { public Map getMetadata() { return this.metadata; } - - public void setParquetType(Type parquetType) { - this.parquetType = parquetType; - } - - public void setMetadata(Map metadata) { - this.metadata = metadata; - } - - public PrimitiveType.PrimitiveTypeName getPrimitiveTypeName() { - return parquetType.asPrimitiveType().getPrimitiveTypeName(); - } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index 60508c1ba..7535395db 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -335,6 +335,7 @@ public SnowflakeStreamingIngestChannelInternal openChannel(OpenChannelRequest request.getFullyQualifiedTableName(), getName()); + OpenChannelResponse response = null; try { OpenChannelRequestInternal openChannelRequest = new OpenChannelRequestInternal( @@ -347,49 +348,56 @@ public SnowflakeStreamingIngestChannelInternal openChannel(OpenChannelRequest Constants.WriteMode.CLOUD_STORAGE, this.isIcebergMode, request.getOffsetToken()); - OpenChannelResponse response = snowflakeServiceClient.openChannel(openChannelRequest); - - logger.logInfo( - "Open channel request succeeded, channel={}, table={}, clientSequencer={}," - + " rowSequencer={}, client={}", - request.getChannelName(), - request.getFullyQualifiedTableName(), - response.getClientSequencer(), - response.getRowSequencer(), - getName()); - - // Channel is now registered, add it to the in-memory channel pool - SnowflakeStreamingIngestChannelInternal channel = - SnowflakeStreamingIngestChannelFactory.builder(response.getChannelName()) - .setDBName(response.getDBName()) - .setSchemaName(response.getSchemaName()) - .setTableName(response.getTableName()) - .setOffsetToken(response.getOffsetToken()) - .setRowSequencer(response.getRowSequencer()) - .setChannelSequencer(response.getClientSequencer()) - .setOwningClient(this) - .setEncryptionKey(response.getEncryptionKey()) - .setEncryptionKeyId(response.getEncryptionKeyId()) - .setOnErrorOption(request.getOnErrorOption()) - .setDefaultTimezone(request.getDefaultTimezone()) - .setOffsetTokenVerificationFunction(request.getOffsetTokenVerificationFunction()) - .build(); - - // Setup the row buffer schema - channel.setupSchema(response.getTableColumns()); - - // Add channel to the channel cache - this.channelCache.addChannel(channel); - this.storageManager.addStorage( - response.getDBName(), - response.getSchemaName(), - response.getTableName(), - response.getExternalVolumeLocation()); - - return channel; + response = snowflakeServiceClient.openChannel(openChannelRequest); } catch (IOException | IngestResponseException e) { throw new SFException(e, ErrorCode.OPEN_CHANNEL_FAILURE, e.getMessage()); } + + if (isIcebergMode + && response.getTableColumns().stream() + .anyMatch(c -> c.getSourceIcebergDataType() == null)) { + throw new SFException( + ErrorCode.INTERNAL_ERROR, "Iceberg table columns must have sourceIcebergDataType set."); + } + + logger.logInfo( + "Open channel request succeeded, channel={}, table={}, clientSequencer={}," + + " rowSequencer={}, client={}", + request.getChannelName(), + request.getFullyQualifiedTableName(), + response.getClientSequencer(), + response.getRowSequencer(), + getName()); + + // Channel is now registered, add it to the in-memory channel pool + SnowflakeStreamingIngestChannelInternal channel = + SnowflakeStreamingIngestChannelFactory.builder(response.getChannelName()) + .setDBName(response.getDBName()) + .setSchemaName(response.getSchemaName()) + .setTableName(response.getTableName()) + .setOffsetToken(response.getOffsetToken()) + .setRowSequencer(response.getRowSequencer()) + .setChannelSequencer(response.getClientSequencer()) + .setOwningClient(this) + .setEncryptionKey(response.getEncryptionKey()) + .setEncryptionKeyId(response.getEncryptionKeyId()) + .setOnErrorOption(request.getOnErrorOption()) + .setDefaultTimezone(request.getDefaultTimezone()) + .setOffsetTokenVerificationFunction(request.getOffsetTokenVerificationFunction()) + .build(); + + // Setup the row buffer schema + channel.setupSchema(response.getTableColumns()); + + // Add channel to the channel cache + this.channelCache.addChannel(channel); + this.storageManager.addStorage( + response.getDBName(), + response.getSchemaName(), + response.getTableName(), + response.getExternalVolumeLocation()); + + return channel; } @Override @@ -919,6 +927,11 @@ public void setRefreshToken(String refreshToken) { } } + /** Return whether the client is streaming to Iceberg tables */ + boolean isIcebergMode() { + return isIcebergMode; + } + /** * Registers the performance metrics along with JVM memory and Threads. * diff --git a/src/main/java/net/snowflake/ingest/utils/IcebergDataTypeParser.java b/src/main/java/net/snowflake/ingest/utils/IcebergDataTypeParser.java index 4a1b084eb..95e484b70 100644 --- a/src/main/java/net/snowflake/ingest/utils/IcebergDataTypeParser.java +++ b/src/main/java/net/snowflake/ingest/utils/IcebergDataTypeParser.java @@ -4,14 +4,15 @@ package net.snowflake.ingest.utils; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -import java.io.IOException; import java.util.Iterator; import java.util.List; import javax.annotation.Nonnull; +import org.apache.iceberg.parquet.TypeToMessageType; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.JsonUtil; @@ -46,6 +47,31 @@ public class IcebergDataTypeParser { /** Object mapper for this class */ private static final ObjectMapper MAPPER = new ObjectMapper(); + /** Util class that contains the mapping between Iceberg data type and Parquet data type */ + private static final TypeToMessageType typeToMessageType = new TypeToMessageType(); + + /** + * Get Iceberg data type information by deserialization. + * + * @param icebergDataType string representation of Iceberg data type + * @param repetition repetition of the Parquet data type + * @param id column id + * @param name column name + * @return Iceberg data type + */ + public static org.apache.parquet.schema.Type parseIcebergDataTypeStringToParquetType( + String icebergDataType, + org.apache.parquet.schema.Type.Repetition repetition, + int id, + String name) { + Type icebergType = deserializeIcebergType(icebergDataType); + if (!icebergType.isPrimitiveType()) { + throw new IllegalArgumentException( + String.format("Snowflake supports only primitive Iceberg types, got '%s'", icebergType)); + } + return typeToMessageType.primitive(icebergType.asPrimitiveType(), repetition, id, name); + } + /** * Get Iceberg data type information by deserialization. * @@ -56,8 +82,9 @@ public static Type deserializeIcebergType(String icebergDataType) { try { JsonNode json = MAPPER.readTree(icebergDataType); return getTypeFromJson(json); - } catch (IOException e) { - throw new SFException(ErrorCode.INTERNAL_ERROR, "Failed to deserialize Iceberg data type", e); + } catch (JsonProcessingException e) { + throw new IllegalArgumentException( + String.format("Failed to deserialize Iceberg data type: %s", icebergDataType)); } } @@ -71,6 +98,10 @@ public static Type getTypeFromJson(@Nonnull JsonNode jsonNode) { if (jsonNode.isTextual()) { return Types.fromPrimitiveString(jsonNode.asText()); } else if (jsonNode.isObject()) { + if (!jsonNode.has(TYPE)) { + throw new IllegalArgumentException( + String.format("Missing key '%s' in schema: %s", TYPE, jsonNode)); + } String type = jsonNode.get(TYPE).asText(); if (STRUCT.equals(type)) { return structFromJson(jsonNode); @@ -79,9 +110,11 @@ public static Type getTypeFromJson(@Nonnull JsonNode jsonNode) { } else if (MAP.equals(type)) { return mapFromJson(jsonNode); } + throw new IllegalArgumentException( + String.format("Cannot parse Iceberg type: %s, schema: %s", type, jsonNode)); } - throw new SFException(ErrorCode.INTERNAL_ERROR, "Cannot parse Iceberg type from: " + jsonNode); + throw new IllegalArgumentException("Cannot parse Iceberg type from schema: " + jsonNode); } /** @@ -91,7 +124,12 @@ public static Type getTypeFromJson(@Nonnull JsonNode jsonNode) { * @return struct type */ public static @Nonnull Types.StructType structFromJson(@Nonnull JsonNode json) { + if (!json.has(FIELDS)) { + throw new IllegalArgumentException( + String.format("Missing key '%s' in schema: %s", FIELDS, json)); + } JsonNode fieldArray = json.get(FIELDS); + Preconditions.checkArgument(fieldArray != null, "Field array cannot be null"); Preconditions.checkArgument( fieldArray.isArray(), "Cannot parse struct fields from non-array: %s", fieldArray); diff --git a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java index 0525737a3..95e0339f6 100644 --- a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java +++ b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java @@ -85,6 +85,8 @@ public class ParameterProvider { public static final boolean ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT = true; + public static final boolean IS_ICEBERG_MODE_DEFAULT = false; + /** Map of parameter name to parameter value. This will be set by client/configure API Call. */ private final Map parameterMap = new HashMap<>(); diff --git a/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java b/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java index e96ea5e28..eb5c20f03 100644 --- a/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java +++ b/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java @@ -13,6 +13,7 @@ import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.SFException; import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.values.factory.DefaultV1ValuesWriterFactory; import org.apache.parquet.crypto.FileEncryptionProperties; @@ -265,7 +266,7 @@ public void prepareForWrite(RecordConsumer recordConsumer) { @Override public void write(List values) { - List cols = schema.getFields(); + List cols = schema.getColumns(); if (values.size() != cols.size()) { throw new ParquetEncodingException( "Invalid input data in channel '" @@ -325,6 +326,8 @@ private void writeValues(List values, GroupType type) { throw new ParquetEncodingException( "Unsupported column type: " + cols.get(i).asPrimitiveType()); } + } else { + throw new ParquetEncodingException("Unsupported column type: " + cols.get(i)); } recordConsumer.endField(fieldName, i); } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/IcebergDataTypeParserTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/IcebergDataTypeParserTest.java new file mode 100644 index 000000000..e12d2f4f7 --- /dev/null +++ b/src/test/java/net/snowflake/ingest/streaming/internal/IcebergDataTypeParserTest.java @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal; + +import java.util.ArrayList; +import java.util.List; +import net.snowflake.ingest.utils.IcebergDataTypeParser; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** Test for Iceberg data type serialization and deserialization. */ +public class IcebergDataTypeParserTest { + private class DataTypeInfo { + + // Json representation of Iceberg data type + String jsonStr; + Type icebergType; + + DataTypeInfo(String jsonStr, Type icebergType) { + this.jsonStr = jsonStr; + this.icebergType = icebergType; + } + } + + private int fieldId = 0; + + List dataTypesToTest; + + @Before + public void setup() { + fieldId = 0; + + // Create a Iceberg data type information list with primitive types and nested types. + dataTypesToTest = new ArrayList<>(); + dataTypesToTest.add(new DataTypeInfo("\"boolean\"", Types.BooleanType.get())); + dataTypesToTest.add(new DataTypeInfo("\"int\"", Types.IntegerType.get())); + dataTypesToTest.add(new DataTypeInfo("\"long\"", Types.LongType.get())); + dataTypesToTest.add(new DataTypeInfo("\"float\"", Types.FloatType.get())); + dataTypesToTest.add(new DataTypeInfo("\"double\"", Types.DoubleType.get())); + dataTypesToTest.add(new DataTypeInfo("\"string\"", Types.StringType.get())); + dataTypesToTest.add(new DataTypeInfo("\"date\"", Types.DateType.get())); + dataTypesToTest.add(new DataTypeInfo("\"time\"", Types.TimeType.get())); + dataTypesToTest.add(new DataTypeInfo("\"timestamptz\"", Types.TimestampType.withZone())); + dataTypesToTest.add( + new DataTypeInfo( + "{\"type\":\"struct\",\"fields\":[{\"id\":1,\"name\":\"first\",\"required\":false,\"type\":\"int\"},{\"id\":2,\"name\":\"second\",\"required\":false,\"type\":\"int\"}]}", + Types.StructType.of( + Types.NestedField.optional(generateFieldId(), "first", Types.IntegerType.get()), + Types.NestedField.optional(generateFieldId(), "second", Types.IntegerType.get())))); + dataTypesToTest.add( + new DataTypeInfo( + "{\"type\":\"list\",\"element-id\":3,\"element\":\"int\",\"element-required\":false}", + Types.ListType.ofOptional(generateFieldId(), Types.IntegerType.get()))); + dataTypesToTest.add( + new DataTypeInfo( + "{\"type\":\"map\",\"key-id\":4,\"key\":\"int\",\"value-id\":5,\"value\":\"string\",\"value-required\":false}", + Types.MapType.ofOptional( + generateFieldId(), + generateFieldId(), + Types.IntegerType.get(), + Types.StringType.get()))); + } + + /** Helper function to generate a unique fieldId for nested types */ + private int generateFieldId() { + fieldId++; + return fieldId; + } + + /** Test for Iceberg data type deserialization. */ + @Test + public void testDeserializeIcebergType() { + for (int i = 0; i < dataTypesToTest.size(); i++) { + DataTypeInfo typeInfo = dataTypesToTest.get(i); + Type dataType = IcebergDataTypeParser.deserializeIcebergType(typeInfo.jsonStr); + Assert.assertEquals(typeInfo.icebergType, dataType); + } + } + + @Test + public void testDeserializeIcebergTypeFailed() { + String json = "bad json"; + IllegalArgumentException exception = + Assert.assertThrows( + IllegalArgumentException.class, + () -> IcebergDataTypeParser.deserializeIcebergType(json)); + Assert.assertEquals( + "Failed to deserialize Iceberg data type: bad json", exception.getMessage()); + } + + @Test + public void testUnsupportedIcebergType() { + String json = "{\"type\":\"unsupported\"}"; + IllegalArgumentException exception = + Assert.assertThrows( + IllegalArgumentException.class, + () -> IcebergDataTypeParser.deserializeIcebergType(json)); + Assert.assertEquals( + "Cannot parse Iceberg type: unsupported, schema: {\"type\":\"unsupported\"}", + exception.getMessage()); + } +} diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java index 41739c267..a83dc9f23 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal; import static java.time.ZoneOffset.UTC; @@ -18,17 +22,28 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import net.snowflake.ingest.streaming.InsertValidationResponse; import net.snowflake.ingest.streaming.OpenChannelRequest; import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.SFException; import org.apache.commons.codec.binary.Hex; +import org.apache.commons.lang3.StringUtils; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +@RunWith(Parameterized.class) public class RowBufferTest { + @Parameterized.Parameters(name = "isIcebergMode: {0}") + public static Object[] isIcebergMode() { + return new Object[] {false, true}; + } + + @Parameterized.Parameter public static boolean isIcebergMode; private final boolean enableParquetMemoryOptimization; private AbstractRowBuffer rowBufferOnErrorContinue; @@ -108,6 +123,16 @@ static List createSchema() { colChar.setLength(11); colChar.setScale(0); + if (isIcebergMode) { + colTinyIntCase.setSourceIcebergDataType("\"decimal(2,0)\""); + colTinyInt.setSourceIcebergDataType("\"decimal(1,0)\""); + colSmallInt.setSourceIcebergDataType("\"decimal(2,0)\""); + colInt.setSourceIcebergDataType("\"int\""); + colBigInt.setSourceIcebergDataType("\"long\""); + colDecimal.setSourceIcebergDataType("\"decimal(38,2)\""); + colChar.setSourceIcebergDataType("\"string\""); + } + List columns = Arrays.asList( colTinyIntCase, colTinyInt, colSmallInt, colInt, colBigInt, colDecimal, colChar); @@ -131,7 +156,8 @@ private AbstractRowBuffer createTestBuffer(OpenChannelRequest.OnErrorOption o MAX_CHUNK_SIZE_IN_BYTES_DEFAULT, MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT, Constants.BdecParquetCompression.GZIP, - ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT), + ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT, + isIcebergMode), null, null); } @@ -277,9 +303,12 @@ public void testInvalidPhysicalType() { @Test public void testStringLength() { - testStringLengthHelper(this.rowBufferOnErrorContinue); - testStringLengthHelper(this.rowBufferOnErrorAbort); - testStringLengthHelper(this.rowBufferOnErrorSkipBatch); + /* Iceberg cannot specify max length of string */ + if (!isIcebergMode) { + testStringLengthHelper(this.rowBufferOnErrorContinue); + testStringLengthHelper(this.rowBufferOnErrorAbort); + testStringLengthHelper(this.rowBufferOnErrorSkipBatch); + } } @Test @@ -297,7 +326,7 @@ public void testRowIndexWithMultipleRowsWithErrorHelper(AbstractRowBuffer row rows.add(row); row = new HashMap<>(); - row.put("colChar", "1111111111111111111111"); // too big + row.put("colChar", StringUtils.repeat('1', 16777217)); // too big rows.add(row); row = new HashMap<>(); @@ -305,7 +334,7 @@ public void testRowIndexWithMultipleRowsWithErrorHelper(AbstractRowBuffer row rows.add(row); row = new HashMap<>(); - row.put("colChar", "1111111111111111111111"); // too big + row.put("colChar", StringUtils.repeat('1', 16777217)); // too big rows.add(row); InsertValidationResponse response = rowBuffer.insertRows(rows, null, null); @@ -337,8 +366,8 @@ public void testRowIndexWithMultipleRowsWithErrorHelper(AbstractRowBuffer row .equalsIgnoreCase( "The given row cannot be converted to the internal format due to invalid value:" + " Value cannot be ingested into Snowflake column COLCHAR of type STRING," - + " rowIndex:1, reason: String too long: length=22 characters maxLength=11" - + " characters")); + + " rowIndex:1, reason: String too long: length=16777217 bytes" + + " maxLength=16777216 bytes")); Assert.assertTrue( response .getInsertErrors() @@ -348,8 +377,8 @@ public void testRowIndexWithMultipleRowsWithErrorHelper(AbstractRowBuffer row .equalsIgnoreCase( "The given row cannot be converted to the internal format due to invalid value:" + " Value cannot be ingested into Snowflake column COLCHAR of type STRING," - + " rowIndex:3, reason: String too long: length=22 characters maxLength=11" - + " characters")); + + " rowIndex:3, reason: String too long: length=16777217 bytes" + + " maxLength=16777216 bytes")); } private void testStringLengthHelper(AbstractRowBuffer rowBuffer) { @@ -825,6 +854,12 @@ private void testStatsE2ETimestampHelper(OpenChannelRequest.OnErrorOption onErro colTimestampLtzSB16Scale6.setLogicalType("TIMESTAMP_LTZ"); colTimestampLtzSB16Scale6.setScale(6); + if (isIcebergMode) { + colTimestampLtzSB8.setSourceIcebergDataType("\"timestamptz\""); + colTimestampLtzSB16.setSourceIcebergDataType("\"timestamptz\""); + colTimestampLtzSB16Scale6.setSourceIcebergDataType("\"timestamptz\""); + } + innerBuffer.setupSchema( Arrays.asList(colTimestampLtzSB8, colTimestampLtzSB16, colTimestampLtzSB16Scale6)); @@ -850,18 +885,23 @@ private void testStatsE2ETimestampHelper(OpenChannelRequest.OnErrorOption onErro Assert.assertEquals(3, result.getRowCount()); Assert.assertEquals( - BigInteger.valueOf(1621899220), + BigInteger.valueOf(1621899220 * (isIcebergMode ? 1000000L : 1)), result.getColumnEps().get("COLTIMESTAMPLTZ_SB8").getCurrentMinIntValue()); Assert.assertEquals( - BigInteger.valueOf(1621899221), + BigInteger.valueOf(1621899221 * (isIcebergMode ? 1000000L : 1)), result.getColumnEps().get("COLTIMESTAMPLTZ_SB8").getCurrentMaxIntValue()); - Assert.assertEquals( - new BigInteger("1621899220123456789"), - result.getColumnEps().get("COLTIMESTAMPLTZ_SB16").getCurrentMinIntValue()); - Assert.assertEquals( - new BigInteger("1621899220223456789"), - result.getColumnEps().get("COLTIMESTAMPLTZ_SB16").getCurrentMaxIntValue()); + /* Iceberg only supports microsecond precision for TIMESTAMP_LTZ */ + if (!isIcebergMode) { + Assert.assertEquals( + new BigInteger("1621899220123456789"), + result.getColumnEps().get("COLTIMESTAMPLTZ_SB16").getCurrentMinIntValue()); + Assert.assertEquals( + new BigInteger("1621899220223456789"), + result.getColumnEps().get("COLTIMESTAMPLTZ_SB16").getCurrentMaxIntValue()); + Assert.assertEquals( + 1, result.getColumnEps().get("COLTIMESTAMPLTZ_SB16").getCurrentNullCount()); + } Assert.assertEquals( new BigInteger("1621899220123456"), @@ -871,7 +911,6 @@ private void testStatsE2ETimestampHelper(OpenChannelRequest.OnErrorOption onErro result.getColumnEps().get("COLTIMESTAMPLTZ_SB16_SCALE6").getCurrentMaxIntValue()); Assert.assertEquals(1, result.getColumnEps().get("COLTIMESTAMPLTZ_SB8").getCurrentNullCount()); - Assert.assertEquals(1, result.getColumnEps().get("COLTIMESTAMPLTZ_SB16").getCurrentNullCount()); Assert.assertEquals( 1, result.getColumnEps().get("COLTIMESTAMPLTZ_SB16_SCALE6").getCurrentNullCount()); } @@ -952,6 +991,11 @@ private void testE2ETimeHelper(OpenChannelRequest.OnErrorOption onErrorOption) { colTimeSB8.setLogicalType("TIME"); colTimeSB8.setScale(3); + if (isIcebergMode) { + colTimeSB4.setSourceIcebergDataType("\"time\""); + colTimeSB8.setSourceIcebergDataType("\"time\""); + } + innerBuffer.setupSchema(Arrays.asList(colTimeSB4, colTimeSB8)); Map row1 = new HashMap<>(); @@ -971,34 +1015,65 @@ private void testE2ETimeHelper(OpenChannelRequest.OnErrorOption onErrorOption) { Assert.assertFalse(response.hasErrors()); // Check data was inserted into the buffer correctly - Assert.assertEquals(10 * 60 * 60, innerBuffer.getVectorValueAt("COLTIMESB4", 0)); - Assert.assertEquals(11 * 60 * 60 + 15 * 60, innerBuffer.getVectorValueAt("COLTIMESB4", 1)); - Assert.assertNull(innerBuffer.getVectorValueAt("COLTIMESB4", 2)); + if (isIcebergMode) { + Assert.assertEquals(10 * 60 * 60 * 1000000L, innerBuffer.getVectorValueAt("COLTIMESB4", 0)); + Assert.assertEquals( + (11 * 60 * 60 + 15 * 60) * 1000000L, innerBuffer.getVectorValueAt("COLTIMESB4", 1)); + Assert.assertEquals( + (10 * 60 * 60 * 1000L + 123) * 1000L, innerBuffer.getVectorValueAt("COLTIMESB8", 0)); + Assert.assertEquals( + (11 * 60 * 60 * 1000L + 15 * 60 * 1000 + 456) * 1000L, + innerBuffer.getVectorValueAt("COLTIMESB8", 1)); + } else { + Assert.assertEquals(10 * 60 * 60, innerBuffer.getVectorValueAt("COLTIMESB4", 0)); + Assert.assertEquals(11 * 60 * 60 + 15 * 60, innerBuffer.getVectorValueAt("COLTIMESB4", 1)); + Assert.assertEquals( + 10 * 60 * 60 * 1000L + 123, innerBuffer.getVectorValueAt("COLTIMESB8", 0)); + Assert.assertEquals( + 11 * 60 * 60 * 1000L + 15 * 60 * 1000 + 456, + innerBuffer.getVectorValueAt("COLTIMESB8", 1)); + } - Assert.assertEquals(10 * 60 * 60 * 1000L + 123, innerBuffer.getVectorValueAt("COLTIMESB8", 0)); - Assert.assertEquals( - 11 * 60 * 60 * 1000L + 15 * 60 * 1000 + 456, innerBuffer.getVectorValueAt("COLTIMESB8", 1)); + Assert.assertNull(innerBuffer.getVectorValueAt("COLTIMESB4", 2)); Assert.assertNull(innerBuffer.getVectorValueAt("COLTIMESB8", 2)); // Check stats generation ChannelData result = innerBuffer.flush("my_snowpipe_streaming.bdec"); Assert.assertEquals(3, result.getRowCount()); - Assert.assertEquals( - BigInteger.valueOf(10 * 60 * 60), - result.getColumnEps().get("COLTIMESB4").getCurrentMinIntValue()); - Assert.assertEquals( - BigInteger.valueOf(11 * 60 * 60 + 15 * 60), - result.getColumnEps().get("COLTIMESB4").getCurrentMaxIntValue()); - Assert.assertEquals(1, result.getColumnEps().get("COLTIMESB4").getCurrentNullCount()); + if (isIcebergMode) { + Assert.assertEquals( + BigInteger.valueOf(10 * 60 * 60 * 1000000L), + result.getColumnEps().get("COLTIMESB4").getCurrentMinIntValue()); + Assert.assertEquals( + BigInteger.valueOf((11 * 60 * 60 + 15 * 60) * 1000000L), + result.getColumnEps().get("COLTIMESB4").getCurrentMaxIntValue()); + Assert.assertEquals(1, result.getColumnEps().get("COLTIMESB4").getCurrentNullCount()); - Assert.assertEquals( - BigInteger.valueOf(10 * 60 * 60 * 1000L + 123), - result.getColumnEps().get("COLTIMESB8").getCurrentMinIntValue()); - Assert.assertEquals( - BigInteger.valueOf(11 * 60 * 60 * 1000L + 15 * 60 * 1000 + 456), - result.getColumnEps().get("COLTIMESB8").getCurrentMaxIntValue()); - Assert.assertEquals(1, result.getColumnEps().get("COLTIMESB8").getCurrentNullCount()); + Assert.assertEquals( + BigInteger.valueOf((10 * 60 * 60 * 1000L + 123) * 1000L), + result.getColumnEps().get("COLTIMESB8").getCurrentMinIntValue()); + Assert.assertEquals( + BigInteger.valueOf((11 * 60 * 60 * 1000L + 15 * 60 * 1000 + 456) * 1000L), + result.getColumnEps().get("COLTIMESB8").getCurrentMaxIntValue()); + Assert.assertEquals(1, result.getColumnEps().get("COLTIMESB8").getCurrentNullCount()); + } else { + Assert.assertEquals( + BigInteger.valueOf(10 * 60 * 60), + result.getColumnEps().get("COLTIMESB4").getCurrentMinIntValue()); + Assert.assertEquals( + BigInteger.valueOf(11 * 60 * 60 + 15 * 60), + result.getColumnEps().get("COLTIMESB4").getCurrentMaxIntValue()); + Assert.assertEquals(1, result.getColumnEps().get("COLTIMESB4").getCurrentNullCount()); + + Assert.assertEquals( + BigInteger.valueOf(10 * 60 * 60 * 1000L + 123), + result.getColumnEps().get("COLTIMESB8").getCurrentMinIntValue()); + Assert.assertEquals( + BigInteger.valueOf(11 * 60 * 60 * 1000L + 15 * 60 * 1000 + 456), + result.getColumnEps().get("COLTIMESB8").getCurrentMaxIntValue()); + Assert.assertEquals(1, result.getColumnEps().get("COLTIMESB8").getCurrentNullCount()); + } } @Test @@ -1017,6 +1092,7 @@ private void testMaxInsertRowsBatchSizeHelper(OpenChannelRequest.OnErrorOption o colBinary.setLogicalType("BINARY"); colBinary.setLength(8 * 1024 * 1024); colBinary.setByteLength(8 * 1024 * 1024); + colBinary.setSourceIcebergDataType("\"binary\""); byte[] arr = new byte[8 * 1024 * 1024]; innerBuffer.setupSchema(Collections.singletonList(colBinary)); @@ -1301,6 +1377,9 @@ private void testE2EBinaryHelper(OpenChannelRequest.OnErrorOption onErrorOption) colBinary.setLength(32); colBinary.setByteLength(256); colBinary.setScale(0); + if (isIcebergMode) { + colBinary.setSourceIcebergDataType("\"binary\""); + } innerBuffer.setupSchema(Collections.singletonList(colBinary)); @@ -1543,9 +1622,11 @@ public void testOnErrorAbortSkipBatch() { @Test public void testE2EVariant() { - testE2EVariantHelper(OpenChannelRequest.OnErrorOption.ABORT); - testE2EVariantHelper(OpenChannelRequest.OnErrorOption.CONTINUE); - testE2EVariantHelper(OpenChannelRequest.OnErrorOption.SKIP_BATCH); + if (!isIcebergMode) { + testE2EVariantHelper(OpenChannelRequest.OnErrorOption.ABORT); + testE2EVariantHelper(OpenChannelRequest.OnErrorOption.CONTINUE); + testE2EVariantHelper(OpenChannelRequest.OnErrorOption.SKIP_BATCH); + } } private void testE2EVariantHelper(OpenChannelRequest.OnErrorOption onErrorOption) { @@ -1594,9 +1675,11 @@ private void testE2EVariantHelper(OpenChannelRequest.OnErrorOption onErrorOption @Test public void testE2EObject() { - testE2EObjectHelper(OpenChannelRequest.OnErrorOption.ABORT); - testE2EObjectHelper(OpenChannelRequest.OnErrorOption.CONTINUE); - testE2EObjectHelper(OpenChannelRequest.OnErrorOption.SKIP_BATCH); + if (!isIcebergMode) { + testE2EObjectHelper(OpenChannelRequest.OnErrorOption.ABORT); + testE2EObjectHelper(OpenChannelRequest.OnErrorOption.CONTINUE); + testE2EObjectHelper(OpenChannelRequest.OnErrorOption.SKIP_BATCH); + } } private void testE2EObjectHelper(OpenChannelRequest.OnErrorOption onErrorOption) { @@ -1627,9 +1710,11 @@ private void testE2EObjectHelper(OpenChannelRequest.OnErrorOption onErrorOption) @Test public void testE2EArray() { - testE2EArrayHelper(OpenChannelRequest.OnErrorOption.ABORT); - testE2EArrayHelper(OpenChannelRequest.OnErrorOption.CONTINUE); - testE2EArrayHelper(OpenChannelRequest.OnErrorOption.SKIP_BATCH); + if (!isIcebergMode) { + testE2EArrayHelper(OpenChannelRequest.OnErrorOption.ABORT); + testE2EArrayHelper(OpenChannelRequest.OnErrorOption.CONTINUE); + testE2EArrayHelper(OpenChannelRequest.OnErrorOption.SKIP_BATCH); + } } private void testE2EArrayHelper(OpenChannelRequest.OnErrorOption onErrorOption) { @@ -1709,7 +1794,8 @@ public void testOnErrorAbortRowsWithError() { // insert one valid and one invalid row List> mixedRows = new ArrayList<>(); mixedRows.add(Collections.singletonMap("colChar", "b")); - mixedRows.add(Collections.singletonMap("colChar", "1111111111111111111111")); // too big + mixedRows.add( + Collections.singletonMap("colChar", StringUtils.repeat('1', 16777217))); // too big response = innerBufferOnErrorContinue.insertRows(mixedRows, "1", "3"); Assert.assertTrue(response.hasErrors()); @@ -1719,6 +1805,23 @@ public void testOnErrorAbortRowsWithError() { List> snapshotContinueParquet = ((ParquetChunkData) innerBufferOnErrorContinue.getSnapshot("fake/filePath").get()).rows; + if (isIcebergMode) { + // Convert every object to string for iceberg mode + snapshotContinueParquet = + snapshotContinueParquet.stream() + .map( + row -> + row.stream() + .map( + obj -> { + if (obj instanceof byte[]) { + return new String((byte[]) obj, StandardCharsets.UTF_8); + } + return obj; + }) + .collect(Collectors.toList())) + .collect(Collectors.toList()); + } // validRows and only the good row from mixedRows are in the buffer Assert.assertEquals(2, snapshotContinueParquet.size()); Assert.assertEquals(Arrays.asList("a"), snapshotContinueParquet.get(0)); @@ -1726,6 +1829,23 @@ public void testOnErrorAbortRowsWithError() { List> snapshotAbortParquet = ((ParquetChunkData) innerBufferOnErrorAbort.getSnapshot("fake/filePath").get()).rows; + if (isIcebergMode) { + // Convert every object to string for iceberg mode + snapshotAbortParquet = + snapshotAbortParquet.stream() + .map( + row -> + row.stream() + .map( + obj -> { + if (obj instanceof byte[]) { + return new String((byte[]) obj, StandardCharsets.UTF_8); + } + return obj; + }) + .collect(Collectors.toList())) + .collect(Collectors.toList()); + } // only validRows and none of the mixedRows are in the buffer Assert.assertEquals(1, snapshotAbortParquet.size()); Assert.assertEquals(Arrays.asList("a"), snapshotAbortParquet.get(0)); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeServiceClientTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeServiceClientTest.java index 0e113c0ab..4815a5469 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeServiceClientTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeServiceClientTest.java @@ -186,6 +186,7 @@ public void testOpenChannel() throws IngestResponseException, IOException { "test_table", "test_channel", Constants.WriteMode.CLOUD_STORAGE, + false, "test_offset_token"); OpenChannelResponse openChannelResponse = snowflakeServiceClient.openChannel(openChannelRequest); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java index dc3285df8..27fc974c9 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java @@ -632,6 +632,9 @@ public void testInsertTooLargeRow() { col.setLogicalType("BINARY"); col.setLength(8388608); col.setByteLength(8388608); + if (isIcebergMode) { + col.setSourceIcebergDataType("\"binary\""); + } return col; }) .collect(Collectors.toList());