diff --git a/pom.xml b/pom.xml index 243bbce34..73483f5ed 100644 --- a/pom.xml +++ b/pom.xml @@ -1,4 +1,7 @@ + 4.0.0 @@ -48,6 +51,7 @@ 2.17.2 32.0.1-jre 3.3.6 + 1.3.1 true 0.8.5 ${project.build.directory}/dependency-jars @@ -262,6 +266,21 @@ + + org.apache.iceberg + iceberg-api + ${iceberg.version} + + + org.apache.iceberg + iceberg-core + ${iceberg.version} + + + org.apache.iceberg + iceberg-parquet + ${iceberg.version} + org.apache.parquet parquet-column @@ -506,6 +525,34 @@ org.apache.hadoop hadoop-mapreduce-client-core + + org.apache.iceberg + iceberg-api + + + org.apache.iceberg + iceberg-core + + + io.airlift + aircompressor + + + + + org.apache.iceberg + iceberg-parquet + + + io.airlift + aircompressor + + + org.apache.parquet + parquet-avro + + + org.apache.parquet @@ -901,9 +948,10 @@ |Apache-2.0 |Apache License, Version 2.0 |Apache 2.0 - |Apache License V2.0 + |Apache License V2.0 + |Apache 2 BSD 2-Clause License - |The BSD License + |The BSD License |BSD The MIT License|MIT License 3-Clause BSD License|BSD-3-Clause @@ -1146,6 +1194,10 @@ io.airlift.compress ${shadeBase}.io.airlift.compress + + org.roaringbitmap + ${shadeBase}.org.roaringbitmap + @@ -1164,6 +1216,9 @@ META-INF/*.SF META-INF/*.DSA META-INF/*.RSA + LICENSE + NOTICE + iceberg-build.properties google/protobuf/**/*.proto diff --git a/scripts/process_licenses.py b/scripts/process_licenses.py index 4a0377a8e..9f715abd6 100644 --- a/scripts/process_licenses.py +++ b/scripts/process_licenses.py @@ -61,6 +61,9 @@ "org.bouncycastle:bcpkix-jdk18on": BOUNCY_CASTLE_LICENSE, "org.bouncycastle:bcutil-jdk18on": BOUNCY_CASTLE_LICENSE, "org.bouncycastle:bcprov-jdk18on": BOUNCY_CASTLE_LICENSE, + "com.thoughtworks.paranamer:paranamer": BSD_2_CLAUSE_LICENSE, + "org.roaringbitmap:RoaringBitmap": APACHE_LICENSE, + "org.roaringbitmap:shims": APACHE_LICENSE, } @@ -115,7 +118,7 @@ def main(): for zip_info in current_jar_as_zip.infolist(): if zip_info.is_dir(): continue - if zip_info.filename in ("META-INF/LICENSE.txt", "META-INF/LICENSE", "META-INF/LICENSE.md"): + if zip_info.filename in ("META-INF/LICENSE.txt", "META-INF/LICENSE", "META-INF/LICENSE.md", "LICENSE"): license_found = True dependency_with_license_count += 1 # Extract license to the target directory diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java b/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java index c73337d1e..7e482679a 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2023-2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.streaming.internal; @@ -18,21 +18,26 @@ public class ClientBufferParameters { private Constants.BdecParquetCompression bdecParquetCompression; + private boolean isIcebergMode; + /** * Private constructor used for test methods * * @param maxChunkSizeInBytes maximum chunk size in bytes * @param maxAllowedRowSizeInBytes maximum row size in bytes + * @param isIcebergMode */ private ClientBufferParameters( long maxChunkSizeInBytes, long maxAllowedRowSizeInBytes, Constants.BdecParquetCompression bdecParquetCompression, - boolean enableNewJsonParsingLogic) { + boolean enableNewJsonParsingLogic, + boolean isIcebergMode) { this.maxChunkSizeInBytes = maxChunkSizeInBytes; this.maxAllowedRowSizeInBytes = maxAllowedRowSizeInBytes; this.bdecParquetCompression = bdecParquetCompression; this.enableNewJsonParsingLogic = enableNewJsonParsingLogic; + this.isIcebergMode = isIcebergMode; } /** @param clientInternal reference to the client object where the relevant parameters are set */ @@ -49,28 +54,34 @@ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInter clientInternal != null ? clientInternal.getParameterProvider().getBdecParquetCompressionAlgorithm() : ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT; - this.enableNewJsonParsingLogic = clientInternal != null ? clientInternal.getParameterProvider().isEnableNewJsonParsingLogic() : ParameterProvider.ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT; + this.isIcebergMode = + clientInternal != null + ? clientInternal.isIcebergMode() + : ParameterProvider.IS_ICEBERG_MODE_DEFAULT; } /** * @param maxChunkSizeInBytes maximum chunk size in bytes * @param maxAllowedRowSizeInBytes maximum row size in bytes + * @param isIcebergMode * @return ClientBufferParameters object */ public static ClientBufferParameters test_createClientBufferParameters( long maxChunkSizeInBytes, long maxAllowedRowSizeInBytes, Constants.BdecParquetCompression bdecParquetCompression, - boolean enableNewJsonParsingLogic) { + boolean enableNewJsonParsingLogic, + boolean isIcebergMode) { return new ClientBufferParameters( maxChunkSizeInBytes, maxAllowedRowSizeInBytes, bdecParquetCompression, - enableNewJsonParsingLogic); + enableNewJsonParsingLogic, + isIcebergMode); } public long getMaxChunkSizeInBytes() { @@ -88,4 +99,8 @@ public Constants.BdecParquetCompression getBdecParquetCompression() { public boolean isEnableNewJsonParsingLogic() { return enableNewJsonParsingLogic; } + + public boolean getIsIcebergMode() { + return isIcebergMode; + } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ColumnMetadata.java b/src/main/java/net/snowflake/ingest/streaming/internal/ColumnMetadata.java index 73b75da9e..1231247b5 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ColumnMetadata.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ColumnMetadata.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.streaming.internal; @@ -22,6 +22,13 @@ class ColumnMetadata { private boolean nullable; private String collation; + /** + * The Json serialization of Iceberg data type of the column, see JSON serialization + * for more details. + */ + private String sourceIcebergDataType; + /** * The column ordinal is an internal id of the column used by server scanner for the column * identification. @@ -128,6 +135,15 @@ public Integer getOrdinal() { return ordinal; } + @JsonProperty("source_iceberg_data_type") + void setSourceIcebergDataType(String sourceIcebergDataType) { + this.sourceIcebergDataType = sourceIcebergDataType; + } + + public String getSourceIcebergDataType() { + return sourceIcebergDataType; + } + String getInternalName() { return internalName; } @@ -144,6 +160,7 @@ public String toString() { map.put("byte_length", this.byteLength); map.put("length", this.length); map.put("nullable", this.nullable); + map.put("source_iceberg_datatype", this.sourceIcebergDataType); return map.toString(); } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java b/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java index 310a711d0..6e3281997 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.streaming.internal; @@ -19,6 +19,7 @@ import java.io.IOException; import java.math.BigDecimal; import java.math.BigInteger; +import java.math.RoundingMode; import java.nio.charset.StandardCharsets; import java.time.Instant; import java.time.LocalDate; @@ -718,7 +719,13 @@ static BigDecimal validateAndParseBigDecimal( || input instanceof Long) { return BigDecimal.valueOf(((Number) input).longValue()); } else if (input instanceof Float || input instanceof Double) { - return BigDecimal.valueOf(((Number) input).doubleValue()); + try { + return BigDecimal.valueOf(((Number) input).doubleValue()); + } catch (NumberFormatException e) { + /* NaN and infinity are not allowed */ + throw valueFormatNotAllowedException( + columnName, "NUMBER", "Not a valid number", insertRowIndex); + } } else if (input instanceof String) { try { final String stringInput = ((String) input).trim(); @@ -957,6 +964,66 @@ static double validateAndParseReal(String columnName, Object input, long insertR columnName, input.getClass(), "REAL", new String[] {"Number", "String"}, insertRowIndex); } + /** + * Validates and parses input Iceberg INT column. Allowed Java types: + * + *
    + *
  • Number + *
  • String + *
+ * + * @param columnName Column name, used in validation error messages + * @param input Object to validate and parse + * @param insertRowIndex Row index for error reporting + * @return Parsed integer + */ + static int validateAndParseIcebergInt(String columnName, Object input, long insertRowIndex) { + BigDecimal roundedValue = + validateAndParseBigDecimal(columnName, input, insertRowIndex) + .setScale(0, RoundingMode.HALF_UP); + try { + return roundedValue.intValueExact(); + } catch (ArithmeticException e) { + /* overflow */ + throw new SFException( + ErrorCode.INVALID_VALUE_ROW, + String.format( + "Number out of representable inclusive range of integers between %d and %d," + + " rowIndex:%d", + Integer.MIN_VALUE, Integer.MAX_VALUE, insertRowIndex)); + } + } + + /** + * Validates and parses input Iceberg LONG column. Allowed Java types: + * + *
    + *
  • Number + *
  • String + *
+ * + * @param columnName Column name, used in validation error messages + * @param input Object to validate and parse + * @param insertRowIndex Row index for error reporting + * @return Parsed long + */ + static long validateAndParseIcebergLong(String columnName, Object input, long insertRowIndex) { + BigDecimal roundedValue = + validateAndParseBigDecimal(columnName, input, insertRowIndex) + .setScale(0, RoundingMode.HALF_UP); + try { + return roundedValue.longValueExact(); + } catch (ArithmeticException e) { + /* overflow */ + throw new SFException( + ErrorCode.INVALID_VALUE_ROW, + String.format( + "Number out of representable inclusive range of integers between %d and %d," + + " rowIndex:%d", + Long.MIN_VALUE, Long.MAX_VALUE, insertRowIndex)); + } + } + /** * Validate and parse input to integer output, 1=true, 0=false. String values converted to boolean * according to https://docs.snowflake.com/en/sql-reference/functions/to_boolean.html#usage-notes @@ -1003,6 +1070,16 @@ static void checkValueInRange( } } + static void checkFixedLengthByteArray(byte[] bytes, int length, final long insertRowIndex) { + if (bytes.length != length) { + throw new SFException( + ErrorCode.INVALID_FORMAT_ROW, + String.format( + "Binary length mismatch: expected=%d, actual=%d, rowIndex:%d", + length, bytes.length, insertRowIndex)); + } + } + static Set allowedBooleanStringsLowerCased = Sets.newHashSet("1", "0", "yes", "no", "y", "n", "t", "f", "true", "false", "on", "off"); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParser.java b/src/main/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParser.java new file mode 100644 index 000000000..18a66f4d5 --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParser.java @@ -0,0 +1,301 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal; + +import static net.snowflake.ingest.streaming.internal.DataValidationUtil.checkFixedLengthByteArray; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.math.RoundingMode; +import java.nio.charset.StandardCharsets; +import java.time.ZoneId; +import java.util.Arrays; +import java.util.Optional; +import net.snowflake.ingest.utils.Constants; +import net.snowflake.ingest.utils.ErrorCode; +import net.snowflake.ingest.utils.SFException; +import net.snowflake.ingest.utils.Utils; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.DateLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.StringLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.TimeLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Type.Repetition; + +/** Parses a user Iceberg column value into Parquet internal representation for buffering. */ +class IcebergParquetValueParser { + + /** + * Parses a user column value into Parquet internal representation for buffering. + * + * @param value column value provided by user in a row + * @param type Parquet column type + * @param stats column stats to update + * @param defaultTimezone default timezone to use for timestamp parsing + * @param insertRowsCurrIndex Row index corresponding the row to parse (w.r.t input rows in + * insertRows API, and not buffered row) + * @return parsed value and byte size of Parquet internal representation + */ + static ParquetBufferValue parseColumnValueToParquet( + Object value, + Type type, + RowBufferStats stats, + ZoneId defaultTimezone, + long insertRowsCurrIndex) { + Utils.assertNotNull("Parquet column stats", stats); + float estimatedParquetSize = 0F; + if (value != null) { + estimatedParquetSize += ParquetBufferValue.DEFINITION_LEVEL_ENCODING_BYTE_LEN; + PrimitiveType primitiveType = type.asPrimitiveType(); + switch (primitiveType.getPrimitiveTypeName()) { + case BOOLEAN: + int intValue = + DataValidationUtil.validateAndParseBoolean( + type.getName(), value, insertRowsCurrIndex); + value = intValue > 0; + stats.addIntValue(BigInteger.valueOf(intValue)); + estimatedParquetSize += ParquetBufferValue.BIT_ENCODING_BYTE_LEN; + break; + case INT32: + int intVal = getInt32Value(value, primitiveType, insertRowsCurrIndex); + value = intVal; + stats.addIntValue(BigInteger.valueOf(intVal)); + estimatedParquetSize += 4; + break; + case INT64: + long longVal = getInt64Value(value, primitiveType, defaultTimezone, insertRowsCurrIndex); + value = longVal; + stats.addIntValue(BigInteger.valueOf(longVal)); + estimatedParquetSize += 8; + break; + case FLOAT: + float floatVal = + (float) + DataValidationUtil.validateAndParseReal( + type.getName(), value, insertRowsCurrIndex); + value = floatVal; + stats.addRealValue((double) floatVal); + estimatedParquetSize += 4; + break; + case DOUBLE: + double doubleVal = + DataValidationUtil.validateAndParseReal(type.getName(), value, insertRowsCurrIndex); + value = doubleVal; + stats.addRealValue(doubleVal); + estimatedParquetSize += 8; + break; + case BINARY: + byte[] byteVal = getBinaryValue(value, primitiveType, stats, insertRowsCurrIndex); + value = byteVal; + estimatedParquetSize += + ParquetBufferValue.BYTE_ARRAY_LENGTH_ENCODING_BYTE_LEN + byteVal.length; + break; + case FIXED_LEN_BYTE_ARRAY: + byte[] fixedLenByteArrayVal = + getFixedLenByteArrayValue(value, primitiveType, stats, insertRowsCurrIndex); + value = fixedLenByteArrayVal; + estimatedParquetSize += + ParquetBufferValue.BYTE_ARRAY_LENGTH_ENCODING_BYTE_LEN + fixedLenByteArrayVal.length; + break; + default: + throw new SFException( + ErrorCode.UNKNOWN_DATA_TYPE, + type.getLogicalTypeAnnotation(), + primitiveType.getPrimitiveTypeName()); + } + } + + if (value == null) { + if (type.isRepetition(Repetition.REQUIRED)) { + throw new SFException( + ErrorCode.INVALID_FORMAT_ROW, type.getName(), "Passed null to non nullable field"); + } + stats.incCurrentNullCount(); + } + + return new ParquetBufferValue(value, estimatedParquetSize); + } + + /** + * Parses an int32 value based on Parquet logical type. + * + * @param value column value provided by user in a row + * @param type Parquet column type + * @param insertRowsCurrIndex Used for logging the row of index given in insertRows API + * @return parsed int32 value + */ + private static int getInt32Value( + Object value, PrimitiveType type, final long insertRowsCurrIndex) { + LogicalTypeAnnotation logicalTypeAnnotation = type.getLogicalTypeAnnotation(); + if (logicalTypeAnnotation == null) { + return DataValidationUtil.validateAndParseIcebergInt( + type.getName(), value, insertRowsCurrIndex); + } + if (logicalTypeAnnotation instanceof DecimalLogicalTypeAnnotation) { + return getDecimalValue(value, type, insertRowsCurrIndex).unscaledValue().intValue(); + } + if (logicalTypeAnnotation instanceof DateLogicalTypeAnnotation) { + return DataValidationUtil.validateAndParseDate(type.getName(), value, insertRowsCurrIndex); + } + throw new SFException( + ErrorCode.UNKNOWN_DATA_TYPE, logicalTypeAnnotation, type.getPrimitiveTypeName()); + } + + /** + * Parses an int64 value based on Parquet logical type. + * + * @param value column value provided by user in a row + * @param type Parquet column type + * @param insertRowsCurrIndex Used for logging the row of index given in insertRows API + * @return parsed int64 value + */ + private static long getInt64Value( + Object value, PrimitiveType type, ZoneId defaultTimezone, final long insertRowsCurrIndex) { + LogicalTypeAnnotation logicalTypeAnnotation = type.getLogicalTypeAnnotation(); + if (logicalTypeAnnotation == null) { + return DataValidationUtil.validateAndParseIcebergLong( + type.getName(), value, insertRowsCurrIndex); + } + if (logicalTypeAnnotation instanceof DecimalLogicalTypeAnnotation) { + return getDecimalValue(value, type, insertRowsCurrIndex).unscaledValue().longValue(); + } + if (logicalTypeAnnotation instanceof TimeLogicalTypeAnnotation) { + return DataValidationUtil.validateAndParseTime( + type.getName(), + value, + timeUnitToScale(((TimeLogicalTypeAnnotation) logicalTypeAnnotation).getUnit()), + insertRowsCurrIndex) + .longValue(); + } + if (logicalTypeAnnotation instanceof TimestampLogicalTypeAnnotation) { + boolean includeTimeZone = + ((TimestampLogicalTypeAnnotation) logicalTypeAnnotation).isAdjustedToUTC(); + return DataValidationUtil.validateAndParseTimestamp( + type.getName(), + value, + timeUnitToScale(((TimestampLogicalTypeAnnotation) logicalTypeAnnotation).getUnit()), + defaultTimezone, + !includeTimeZone, + insertRowsCurrIndex) + .toBinary(false) + .longValue(); + } + throw new SFException( + ErrorCode.UNKNOWN_DATA_TYPE, + logicalTypeAnnotation, + type.asPrimitiveType().getPrimitiveTypeName()); + } + + /** + * Converts an Iceberg binary or string column to its byte array representation. + * + * @param value value to parse + * @param type Parquet column type + * @param stats column stats to update + * @param insertRowsCurrIndex Used for logging the row of index given in insertRows API + * @return string representation + */ + private static byte[] getBinaryValue( + Object value, PrimitiveType type, RowBufferStats stats, final long insertRowsCurrIndex) { + LogicalTypeAnnotation logicalTypeAnnotation = type.getLogicalTypeAnnotation(); + if (logicalTypeAnnotation == null) { + byte[] bytes = + DataValidationUtil.validateAndParseBinary( + type.getName(), + value, + Optional.of(Constants.BINARY_COLUMN_MAX_SIZE), + insertRowsCurrIndex); + stats.addBinaryValue(bytes); + return bytes; + } + if (logicalTypeAnnotation instanceof StringLogicalTypeAnnotation) { + String string = + DataValidationUtil.validateAndParseString( + type.getName(), + value, + Optional.of(Constants.VARCHAR_COLUMN_MAX_SIZE), + insertRowsCurrIndex); + stats.addStrValue(string); + return string.getBytes(StandardCharsets.UTF_8); + } + throw new SFException( + ErrorCode.UNKNOWN_DATA_TYPE, logicalTypeAnnotation, type.getPrimitiveTypeName()); + } + + /** + * Converts an Iceberg fixed length byte array column to its byte array representation. + * + * @param value value to parse + * @param type Parquet column type + * @param stats column stats to update + * @param insertRowsCurrIndex Used for logging the row of index given in insertRows API + * @return string representation + */ + private static byte[] getFixedLenByteArrayValue( + Object value, PrimitiveType type, RowBufferStats stats, final long insertRowsCurrIndex) { + LogicalTypeAnnotation logicalTypeAnnotation = type.getLogicalTypeAnnotation(); + int length = type.getTypeLength(); + byte[] bytes = null; + if (logicalTypeAnnotation == null) { + bytes = + DataValidationUtil.validateAndParseBinary( + type.getName(), value, Optional.of(length), insertRowsCurrIndex); + stats.addBinaryValue(bytes); + } + if (logicalTypeAnnotation instanceof DecimalLogicalTypeAnnotation) { + BigInteger bigIntegerVal = getDecimalValue(value, type, insertRowsCurrIndex).unscaledValue(); + stats.addIntValue(bigIntegerVal); + bytes = bigIntegerVal.toByteArray(); + if (bytes.length < length) { + byte[] newBytes = new byte[length]; + Arrays.fill(newBytes, (byte) (bytes[0] < 0 ? -1 : 0)); + System.arraycopy(bytes, 0, newBytes, length - bytes.length, bytes.length); + bytes = newBytes; + } + } + if (bytes != null) { + checkFixedLengthByteArray(bytes, length, insertRowsCurrIndex); + return bytes; + } + throw new SFException( + ErrorCode.UNKNOWN_DATA_TYPE, logicalTypeAnnotation, type.getPrimitiveTypeName()); + } + + /** + * Converts a decimal value to its BigDecimal representation. + * + * @param value value to parse + * @param type Parquet column type + * @param insertRowsCurrIndex Used for logging the row of index given in insertRows API + * @return BigDecimal representation + */ + private static BigDecimal getDecimalValue( + Object value, PrimitiveType type, final long insertRowsCurrIndex) { + int scale = ((DecimalLogicalTypeAnnotation) type.getLogicalTypeAnnotation()).getScale(); + int precision = ((DecimalLogicalTypeAnnotation) type.getLogicalTypeAnnotation()).getPrecision(); + BigDecimal bigDecimalValue = + DataValidationUtil.validateAndParseBigDecimal(type.getName(), value, insertRowsCurrIndex); + bigDecimalValue = bigDecimalValue.setScale(scale, RoundingMode.HALF_UP); + DataValidationUtil.checkValueInRange(bigDecimalValue, scale, precision, insertRowsCurrIndex); + return bigDecimalValue; + } + + private static int timeUnitToScale(LogicalTypeAnnotation.TimeUnit timeUnit) { + switch (timeUnit) { + case MILLIS: + return 3; + case MICROS: + return 6; + case NANOS: + return 9; + default: + throw new SFException( + ErrorCode.INTERNAL_ERROR, String.format("Unknown time unit: %s", timeUnit)); + } + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelRequestInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelRequestInternal.java index ff53f6729..c3c427c68 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelRequestInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelRequestInternal.java @@ -32,6 +32,10 @@ class OpenChannelRequestInternal implements IStreamingIngestRequest { @JsonProperty("write_mode") private String writeMode; + @JsonInclude(JsonInclude.Include.NON_NULL) + @JsonProperty("is_iceberg") + private boolean isIceberg; + @JsonInclude(JsonInclude.Include.NON_NULL) @JsonProperty("offset_token") private String offsetToken; @@ -44,6 +48,7 @@ class OpenChannelRequestInternal implements IStreamingIngestRequest { String table, String channel, Constants.WriteMode writeMode, + boolean isIceberg, String offsetToken) { this.requestId = requestId; this.role = role; @@ -52,6 +57,7 @@ class OpenChannelRequestInternal implements IStreamingIngestRequest { this.table = table; this.channel = channel; this.writeMode = writeMode.name(); + this.isIceberg = isIceberg; this.offsetToken = offsetToken; } @@ -83,6 +89,10 @@ String getWriteMode() { return writeMode; } + boolean getIsIceberg() { + return isIceberg; + } + String getOffsetToken() { return offsetToken; } @@ -91,7 +101,7 @@ String getOffsetToken() { public String getStringForLogging() { return String.format( "OpenChannelRequestInternal(requestId=%s, role=%s, db=%s, schema=%s, table=%s, channel=%s," - + " writeMode=%s)", - requestId, role, database, schema, table, channel, writeMode); + + " writeMode=%s, isIceberg=%s)", + requestId, role, database, schema, table, channel, writeMode, isIceberg); } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetBufferValue.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetBufferValue.java new file mode 100644 index 000000000..f89de0aa7 --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetBufferValue.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal; + +/** Parquet internal value representation for buffering. */ +class ParquetBufferValue { + // Parquet uses BitPacking to encode boolean, hence 1 bit per value + public static final float BIT_ENCODING_BYTE_LEN = 1.0f / 8; + + /** + * On average parquet needs 2 bytes / 8 values for the RLE+bitpack encoded definition level. + * + *
    + * There are two cases how definition level (0 for null values, 1 for non-null values) is + * encoded: + *
  • If there are at least 8 repeated values in a row, they are run-length encoded (length + + * value itself). E.g. 11111111 -> 8 1 + *
  • If there are less than 8 repeated values, they are written in group as part of a + * bit-length encoded run, e.g. 1111 -> 15 A bit-length encoded run ends when either 64 + * groups of 8 values have been written or if a new RLE run starts. + *

    To distinguish between RLE and bitpack run, there is 1 extra bytes written as header + * when a bitpack run starts. + *

+ * + *
    + * For more details see ColumnWriterV1#createDLWriter and {@link + * org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder#writeInt(int)} + *
+ */ + public static final float DEFINITION_LEVEL_ENCODING_BYTE_LEN = 2.0f / 8; + + // Parquet stores length in 4 bytes before the actual data bytes + public static final int BYTE_ARRAY_LENGTH_ENCODING_BYTE_LEN = 4; + private final Object value; + private final float size; + + ParquetBufferValue(Object value, float size) { + this.value = value; + this.size = size; + } + + Object getValue() { + return value; + } + + float getSize() { + return size; + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetColumn.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetColumn.java new file mode 100644 index 000000000..68ed7a8c9 --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetColumn.java @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal; + +import org.apache.parquet.schema.Type; + +/** Represents a column in a Parquet file. */ +class ParquetColumn { + final ColumnMetadata columnMetadata; + final int index; + final Type type; + + ParquetColumn(ColumnMetadata columnMetadata, int index, Type type) { + this.columnMetadata = columnMetadata; + this.index = index; + this.type = type; + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java index bcd01aea3..395bea98f 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2022-2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.streaming.internal; @@ -13,6 +13,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.function.Consumer; @@ -25,7 +26,6 @@ import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.SFException; import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; /** @@ -81,14 +81,13 @@ public void setupSchema(List columns) { int id = 1; for (ColumnMetadata column : columns) { validateColumnCollation(column); - ParquetTypeGenerator.ParquetTypeInfo typeInfo = - ParquetTypeGenerator.generateColumnParquetTypeInfo(column, id); + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(column, id); parquetTypes.add(typeInfo.getParquetType()); this.metadata.putAll(typeInfo.getMetadata()); int columnIndex = parquetTypes.size() - 1; fieldIndex.put( column.getInternalName(), - new ParquetColumn(column, columnIndex, typeInfo.getPrimitiveTypeName())); + new ParquetColumn(column, columnIndex, typeInfo.getParquetType())); if (!column.getNullable()) { addNonNullableFieldName(column.getInternalName()); } @@ -172,15 +171,18 @@ private float addRow( RowBufferStats forkedStats = statsMap.get(columnName).forkEmpty(); forkedStatsMap.put(columnName, forkedStats); ColumnMetadata column = parquetColumn.columnMetadata; - ParquetValueParser.ParquetBufferValue valueWithSize = - ParquetValueParser.parseColumnValueToParquet( - value, - column, - parquetColumn.type, - forkedStats, - defaultTimezone, - insertRowsCurrIndex, - clientBufferParameters.isEnableNewJsonParsingLogic()); + ParquetBufferValue valueWithSize = + (clientBufferParameters.getIsIcebergMode() + ? IcebergParquetValueParser.parseColumnValueToParquet( + value, parquetColumn.type, forkedStats, defaultTimezone, insertRowsCurrIndex) + : SnowflakeParquetValueParser.parseColumnValueToParquet( + value, + column, + parquetColumn.type.asPrimitiveType().getPrimitiveTypeName(), + forkedStats, + defaultTimezone, + insertRowsCurrIndex, + clientBufferParameters.isEnableNewJsonParsingLogic())); indexedRow[colIndex] = valueWithSize.getValue(); size += valueWithSize.getSize(); } @@ -264,6 +266,10 @@ Object getVectorValueAt(String column, int index) { if (logicalType == ColumnLogicalType.BINARY && value != null) { value = value instanceof String ? ((String) value).getBytes(StandardCharsets.UTF_8) : value; } + /* Mismatch between Iceberg string & FDN String */ + if (Objects.equals(columnMetadata.getSourceIcebergDataType(), "\"string\"")) { + value = value instanceof byte[] ? new String((byte[]) value, StandardCharsets.UTF_8) : value; + } return value; } @@ -289,17 +295,4 @@ public Flusher createFlusher() { clientBufferParameters.getMaxChunkSizeInBytes(), clientBufferParameters.getBdecParquetCompression()); } - - private static class ParquetColumn { - final ColumnMetadata columnMetadata; - final int index; - final PrimitiveType.PrimitiveTypeName type; - - private ParquetColumn( - ColumnMetadata columnMetadata, int index, PrimitiveType.PrimitiveTypeName type) { - this.columnMetadata = columnMetadata; - this.index = index; - this.type = type; - } - } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetTypeGenerator.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetTypeGenerator.java index c6d0e87c3..d4b70f397 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetTypeGenerator.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetTypeGenerator.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2022-2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.streaming.internal; @@ -10,6 +10,7 @@ import java.util.Map; import java.util.Set; import net.snowflake.ingest.utils.ErrorCode; +import net.snowflake.ingest.utils.IcebergDataTypeParser; import net.snowflake.ingest.utils.SFException; import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.PrimitiveType; @@ -19,35 +20,6 @@ /** Generates the Parquet types for the Snowflake's column types */ public class ParquetTypeGenerator { - /** - * Util class that contains Parquet type and other metadata for that type needed by the Snowflake - * server side scanner - */ - static class ParquetTypeInfo { - private Type parquetType; - private Map metadata; - - public Type getParquetType() { - return this.parquetType; - } - - public Map getMetadata() { - return this.metadata; - } - - public void setParquetType(Type parquetType) { - this.parquetType = parquetType; - } - - public void setMetadata(Map metadata) { - this.metadata = metadata; - } - - public PrimitiveType.PrimitiveTypeName getPrimitiveTypeName() { - return parquetType.asPrimitiveType().getPrimitiveTypeName(); - } - } - private static final Set TIME_SUPPORTED_PHYSICAL_TYPES = new HashSet<>( Arrays.asList( @@ -69,23 +41,10 @@ public PrimitiveType.PrimitiveTypeName getPrimitiveTypeName() { */ static ParquetTypeInfo generateColumnParquetTypeInfo(ColumnMetadata column, int id) { id = column.getOrdinal() == null ? id : column.getOrdinal(); - ParquetTypeInfo res = new ParquetTypeInfo(); Type parquetType; Map metadata = new HashMap<>(); String name = column.getInternalName(); - AbstractRowBuffer.ColumnPhysicalType physicalType; - AbstractRowBuffer.ColumnLogicalType logicalType; - try { - physicalType = AbstractRowBuffer.ColumnPhysicalType.valueOf(column.getPhysicalType()); - logicalType = AbstractRowBuffer.ColumnLogicalType.valueOf(column.getLogicalType()); - } catch (IllegalArgumentException e) { - throw new SFException( - ErrorCode.UNKNOWN_DATA_TYPE, column.getLogicalType(), column.getPhysicalType()); - } - - metadata.put(Integer.toString(id), logicalType.getOrdinal() + "," + physicalType.getOrdinal()); - // Parquet Type.Repetition in general supports repeated values for the same row column, like a // list of values. // This generator uses only either 0 or 1 value for nullable data type (OPTIONAL: 0 or none @@ -94,73 +53,94 @@ static ParquetTypeInfo generateColumnParquetTypeInfo(ColumnMetadata column, int Type.Repetition repetition = column.getNullable() ? Type.Repetition.OPTIONAL : Type.Repetition.REQUIRED; - // Handle differently depends on the column logical and physical types - switch (logicalType) { - case FIXED: - parquetType = getFixedColumnParquetType(column, id, physicalType, repetition); - break; - case ARRAY: - case OBJECT: - case VARIANT: - // mark the column metadata as being an object json for the server side scanner - metadata.put(id + ":obj_enc", "1"); - // parquetType is same as the next one - case ANY: - case CHAR: - case TEXT: - case BINARY: - parquetType = - Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, repetition) - .as(LogicalTypeAnnotation.stringType()) - .id(id) - .named(name); - break; - case TIMESTAMP_LTZ: - case TIMESTAMP_NTZ: - case TIMESTAMP_TZ: - parquetType = - getTimeColumnParquetType( - column.getScale(), - physicalType, - logicalType, - TIMESTAMP_SUPPORTED_PHYSICAL_TYPES, - repetition, - id, - name); - break; - case DATE: - parquetType = - Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition) - .as(LogicalTypeAnnotation.dateType()) - .id(id) - .named(name); - break; - case TIME: - parquetType = - getTimeColumnParquetType( - column.getScale(), - physicalType, - logicalType, - TIME_SUPPORTED_PHYSICAL_TYPES, - repetition, - id, - name); - break; - case BOOLEAN: - parquetType = - Types.primitive(PrimitiveType.PrimitiveTypeName.BOOLEAN, repetition).id(id).named(name); - break; - case REAL: - parquetType = - Types.primitive(PrimitiveType.PrimitiveTypeName.DOUBLE, repetition).id(id).named(name); - break; - default: + if (column.getSourceIcebergDataType() != null) { + parquetType = + IcebergDataTypeParser.parseIcebergDataTypeStringToParquetType( + column.getSourceIcebergDataType(), repetition, id, name); + } else { + AbstractRowBuffer.ColumnPhysicalType physicalType; + AbstractRowBuffer.ColumnLogicalType logicalType; + try { + physicalType = AbstractRowBuffer.ColumnPhysicalType.valueOf(column.getPhysicalType()); + logicalType = AbstractRowBuffer.ColumnLogicalType.valueOf(column.getLogicalType()); + } catch (IllegalArgumentException e) { throw new SFException( ErrorCode.UNKNOWN_DATA_TYPE, column.getLogicalType(), column.getPhysicalType()); + } + + metadata.put( + Integer.toString(id), logicalType.getOrdinal() + "," + physicalType.getOrdinal()); + + // Handle differently depends on the column logical and physical types + switch (logicalType) { + case FIXED: + parquetType = getFixedColumnParquetType(column, id, physicalType, repetition); + break; + case ARRAY: + case OBJECT: + case VARIANT: + // mark the column metadata as being an object json for the server side scanner + metadata.put(id + ":obj_enc", "1"); + // parquetType is same as the next one + case ANY: + case CHAR: + case TEXT: + case BINARY: + parquetType = + Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, repetition) + .as(LogicalTypeAnnotation.stringType()) + .id(id) + .named(name); + break; + case TIMESTAMP_LTZ: + case TIMESTAMP_NTZ: + case TIMESTAMP_TZ: + parquetType = + getTimeColumnParquetType( + column.getScale(), + physicalType, + logicalType, + TIMESTAMP_SUPPORTED_PHYSICAL_TYPES, + repetition, + id, + name); + break; + case DATE: + parquetType = + Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition) + .as(LogicalTypeAnnotation.dateType()) + .id(id) + .named(name); + break; + case TIME: + parquetType = + getTimeColumnParquetType( + column.getScale(), + physicalType, + logicalType, + TIME_SUPPORTED_PHYSICAL_TYPES, + repetition, + id, + name); + break; + case BOOLEAN: + parquetType = + Types.primitive(PrimitiveType.PrimitiveTypeName.BOOLEAN, repetition) + .id(id) + .named(name); + break; + case REAL: + parquetType = + Types.primitive(PrimitiveType.PrimitiveTypeName.DOUBLE, repetition) + .id(id) + .named(name); + break; + default: + throw new SFException( + ErrorCode.UNKNOWN_DATA_TYPE, column.getLogicalType(), column.getPhysicalType()); + } } - res.setParquetType(parquetType); - res.setMetadata(metadata); - return res; + return new ParquetTypeInfo(parquetType, metadata); } /** diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetTypeInfo.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetTypeInfo.java new file mode 100644 index 000000000..9e98bbcc9 --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetTypeInfo.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal; + +import java.util.Map; +import org.apache.parquet.schema.Type; + +/** + * Util class that contains Parquet type and other metadata for that type needed by the Snowflake + * server side scanner + */ +class ParquetTypeInfo { + private final Type parquetType; + private final Map metadata; + + ParquetTypeInfo(Type parquetType, Map metadata) { + this.parquetType = parquetType; + this.metadata = metadata; + } + + public Type getParquetType() { + return this.parquetType; + } + + public Map getMetadata() { + return this.metadata; + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/RegisterBlobRequest.java b/src/main/java/net/snowflake/ingest/streaming/internal/RegisterBlobRequest.java index fcb7edf4f..79a91943d 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/RegisterBlobRequest.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/RegisterBlobRequest.java @@ -4,6 +4,7 @@ package net.snowflake.ingest.streaming.internal; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import java.util.List; import java.util.stream.Collectors; @@ -19,10 +20,15 @@ class RegisterBlobRequest implements IStreamingIngestRequest { @JsonProperty("blobs") private List blobs; - RegisterBlobRequest(String requestId, String role, List blobs) { + @JsonInclude(JsonInclude.Include.NON_NULL) + @JsonProperty("is_iceberg") + private boolean isIceberg; + + RegisterBlobRequest(String requestId, String role, List blobs, boolean isIceberg) { this.requestId = requestId; this.role = role; this.blobs = blobs; + this.isIceberg = isIceberg; } String getRequestId() { @@ -37,6 +43,10 @@ List getBlobs() { return blobs; } + boolean getIsIceberg() { + return isIceberg; + } + @Override public String getStringForLogging() { return String.format( diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetValueParser.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeParquetValueParser.java similarity index 86% rename from src/main/java/net/snowflake/ingest/streaming/internal/ParquetValueParser.java rename to src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeParquetValueParser.java index 298ec2ba2..ba4a38b68 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetValueParser.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeParquetValueParser.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2022-2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.streaming.internal; @@ -15,58 +15,8 @@ import net.snowflake.ingest.utils.Utils; import org.apache.parquet.schema.PrimitiveType; -/** Parses a user column value into Parquet internal representation for buffering. */ -class ParquetValueParser { - - // Parquet uses BitPacking to encode boolean, hence 1 bit per value - public static final float BIT_ENCODING_BYTE_LEN = 1.0f / 8; - - /** - * On average parquet needs 2 bytes / 8 values for the RLE+bitpack encoded definition level. - * - *
    - * There are two cases how definition level (0 for null values, 1 for non-null values) is - * encoded: - *
  • If there are at least 8 repeated values in a row, they are run-length encoded (length + - * value itself). E.g. 11111111 -> 8 1 - *
  • If there are less than 8 repeated values, they are written in group as part of a - * bit-length encoded run, e.g. 1111 -> 15 A bit-length encoded run ends when either 64 - * groups of 8 values have been written or if a new RLE run starts. - *

    To distinguish between RLE and bitpack run, there is 1 extra bytes written as header - * when a bitpack run starts. - *

- * - *
    - * For more details see ColumnWriterV1#createDLWriter and {@link - * org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder#writeInt(int)} - *
- * - *

Since we don't have nested types, repetition level is always 0 and is not stored at all by - * Parquet. - */ - public static final float DEFINITION_LEVEL_ENCODING_BYTE_LEN = 2.0f / 8; - - // Parquet stores length in 4 bytes before the actual data bytes - public static final int BYTE_ARRAY_LENGTH_ENCODING_BYTE_LEN = 4; - - /** Parquet internal value representation for buffering. */ - static class ParquetBufferValue { - private final Object value; - private final float size; - - ParquetBufferValue(Object value, float size) { - this.value = value; - this.size = size; - } - - Object getValue() { - return value; - } - - float getSize() { - return size; - } - } +/** Parses a user Snowflake column value into Parquet internal representation for buffering. */ +class SnowflakeParquetValueParser { /** * Parses a user column value into Parquet internal representation for buffering. @@ -89,7 +39,7 @@ static ParquetBufferValue parseColumnValueToParquet( boolean enableNewJsonParsingLogic) { Utils.assertNotNull("Parquet column stats", stats); float estimatedParquetSize = 0F; - estimatedParquetSize += DEFINITION_LEVEL_ENCODING_BYTE_LEN; + estimatedParquetSize += ParquetBufferValue.DEFINITION_LEVEL_ENCODING_BYTE_LEN; if (value != null) { AbstractRowBuffer.ColumnLogicalType logicalType = AbstractRowBuffer.ColumnLogicalType.valueOf(columnMetadata.getLogicalType()); @@ -102,7 +52,7 @@ static ParquetBufferValue parseColumnValueToParquet( columnMetadata.getName(), value, insertRowsCurrIndex); value = intValue > 0; stats.addIntValue(BigInteger.valueOf(intValue)); - estimatedParquetSize += BIT_ENCODING_BYTE_LEN; + estimatedParquetSize += ParquetBufferValue.BIT_ENCODING_BYTE_LEN; break; case INT32: int intVal = @@ -157,7 +107,8 @@ static ParquetBufferValue parseColumnValueToParquet( } } if (value != null) { - estimatedParquetSize += (BYTE_ARRAY_LENGTH_ENCODING_BYTE_LEN + length); + estimatedParquetSize += + (ParquetBufferValue.BYTE_ARRAY_LENGTH_ENCODING_BYTE_LEN + length); } break; @@ -415,6 +366,7 @@ private static String getBinaryValue( } return str; } + /** * Converts a binary value to its byte array representation. * diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index be6b192b0..b476859d8 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -335,6 +335,7 @@ public SnowflakeStreamingIngestChannelInternal openChannel(OpenChannelRequest request.getFullyQualifiedTableName(), getName()); + OpenChannelResponse response = null; try { OpenChannelRequestInternal openChannelRequest = new OpenChannelRequestInternal( @@ -345,48 +346,56 @@ public SnowflakeStreamingIngestChannelInternal openChannel(OpenChannelRequest request.getTableName(), request.getChannelName(), Constants.WriteMode.CLOUD_STORAGE, + this.isIcebergMode, request.getOffsetToken()); - OpenChannelResponse response = snowflakeServiceClient.openChannel(openChannelRequest); - - logger.logInfo( - "Open channel request succeeded, channel={}, table={}, clientSequencer={}," - + " rowSequencer={}, client={}", - request.getChannelName(), - request.getFullyQualifiedTableName(), - response.getClientSequencer(), - response.getRowSequencer(), - getName()); - - // Channel is now registered, add it to the in-memory channel pool - SnowflakeStreamingIngestChannelInternal channel = - SnowflakeStreamingIngestChannelFactory.builder(response.getChannelName()) - .setDBName(response.getDBName()) - .setSchemaName(response.getSchemaName()) - .setTableName(response.getTableName()) - .setOffsetToken(response.getOffsetToken()) - .setRowSequencer(response.getRowSequencer()) - .setChannelSequencer(response.getClientSequencer()) - .setOwningClient(this) - .setEncryptionKey(response.getEncryptionKey()) - .setEncryptionKeyId(response.getEncryptionKeyId()) - .setOnErrorOption(request.getOnErrorOption()) - .setDefaultTimezone(request.getDefaultTimezone()) - .setOffsetTokenVerificationFunction(request.getOffsetTokenVerificationFunction()) - .build(); - - // Setup the row buffer schema - channel.setupSchema(response.getTableColumns()); - - // Add channel to the channel cache - this.channelCache.addChannel(channel); - this.storageManager.registerTable( - new TableRef(response.getDBName(), response.getSchemaName(), response.getTableName()), - response.getExternalVolumeLocation()); - - return channel; + response = snowflakeServiceClient.openChannel(openChannelRequest); } catch (IOException | IngestResponseException e) { throw new SFException(e, ErrorCode.OPEN_CHANNEL_FAILURE, e.getMessage()); } + + if (isIcebergMode + && response.getTableColumns().stream() + .anyMatch(c -> c.getSourceIcebergDataType() == null)) { + throw new SFException( + ErrorCode.INTERNAL_ERROR, "Iceberg table columns must have sourceIcebergDataType set."); + } + + logger.logInfo( + "Open channel request succeeded, channel={}, table={}, clientSequencer={}," + + " rowSequencer={}, client={}", + request.getChannelName(), + request.getFullyQualifiedTableName(), + response.getClientSequencer(), + response.getRowSequencer(), + getName()); + + // Channel is now registered, add it to the in-memory channel pool + SnowflakeStreamingIngestChannelInternal channel = + SnowflakeStreamingIngestChannelFactory.builder(response.getChannelName()) + .setDBName(response.getDBName()) + .setSchemaName(response.getSchemaName()) + .setTableName(response.getTableName()) + .setOffsetToken(response.getOffsetToken()) + .setRowSequencer(response.getRowSequencer()) + .setChannelSequencer(response.getClientSequencer()) + .setOwningClient(this) + .setEncryptionKey(response.getEncryptionKey()) + .setEncryptionKeyId(response.getEncryptionKeyId()) + .setOnErrorOption(request.getOnErrorOption()) + .setDefaultTimezone(request.getDefaultTimezone()) + .setOffsetTokenVerificationFunction(request.getOffsetTokenVerificationFunction()) + .build(); + + // Setup the row buffer schema + channel.setupSchema(response.getTableColumns()); + + // Add channel to the channel cache + this.channelCache.addChannel(channel); + this.storageManager.registerTable( + new TableRef(response.getDBName(), response.getSchemaName(), response.getTableName()), + response.getExternalVolumeLocation()); + + return channel; } @Override @@ -568,7 +577,8 @@ void registerBlobs(List blobs, final int executionCount) { new RegisterBlobRequest( this.storageManager.getClientPrefix() + "_" + counter.getAndIncrement(), this.role, - blobs); + blobs, + this.isIcebergMode); response = snowflakeServiceClient.registerBlob(request, executionCount); } catch (IOException | IngestResponseException e) { throw new SFException(e, ErrorCode.REGISTER_BLOB_FAILURE, e.getMessage()); @@ -915,6 +925,11 @@ public void setRefreshToken(String refreshToken) { } } + /** Return whether the client is streaming to Iceberg tables */ + boolean isIcebergMode() { + return isIcebergMode; + } + /** * Registers the performance metrics along with JVM memory and Threads. * diff --git a/src/main/java/net/snowflake/ingest/utils/Constants.java b/src/main/java/net/snowflake/ingest/utils/Constants.java index 534a1815e..35eed3469 100644 --- a/src/main/java/net/snowflake/ingest/utils/Constants.java +++ b/src/main/java/net/snowflake/ingest/utils/Constants.java @@ -62,6 +62,8 @@ public class Constants { public static final int STREAMING_INGEST_TELEMETRY_UPLOAD_INTERVAL_IN_SEC = 10; public static final long EP_NDV_UNKNOWN = -1L; public static final int MAX_OAUTH_REFRESH_TOKEN_RETRY = 3; + public static final int BINARY_COLUMN_MAX_SIZE = 8 * 1024 * 1024; + public static final int VARCHAR_COLUMN_MAX_SIZE = 16 * 1024 * 1024; // Channel level constants public static final String CHANNEL_STATUS_ENDPOINT = "/v1/streaming/channels/status/"; diff --git a/src/main/java/net/snowflake/ingest/utils/IcebergDataTypeParser.java b/src/main/java/net/snowflake/ingest/utils/IcebergDataTypeParser.java new file mode 100644 index 000000000..95e484b70 --- /dev/null +++ b/src/main/java/net/snowflake/ingest/utils/IcebergDataTypeParser.java @@ -0,0 +1,198 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.utils; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import java.util.Iterator; +import java.util.List; +import javax.annotation.Nonnull; +import org.apache.iceberg.parquet.TypeToMessageType; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.JsonUtil; + +/** + * This class is used to Iceberg data type (include primitive types and nested types) serialization + * and deserialization. + * + *

This code is modified from + * GlobalServices/modules/data-lake/datalake-api/src/main/java/com/snowflake/metadata/iceberg + * /IcebergDataTypeParser.java + */ +public class IcebergDataTypeParser { + private static final String TYPE = "type"; + private static final String STRUCT = "struct"; + private static final String LIST = "list"; + private static final String MAP = "map"; + private static final String FIELDS = "fields"; + private static final String ELEMENT = "element"; + private static final String KEY = "key"; + private static final String VALUE = "value"; + private static final String DOC = "doc"; + private static final String NAME = "name"; + private static final String ID = "id"; + private static final String ELEMENT_ID = "element-id"; + private static final String KEY_ID = "key-id"; + private static final String VALUE_ID = "value-id"; + private static final String REQUIRED = "required"; + private static final String ELEMENT_REQUIRED = "element-required"; + private static final String VALUE_REQUIRED = "value-required"; + + /** Object mapper for this class */ + private static final ObjectMapper MAPPER = new ObjectMapper(); + + /** Util class that contains the mapping between Iceberg data type and Parquet data type */ + private static final TypeToMessageType typeToMessageType = new TypeToMessageType(); + + /** + * Get Iceberg data type information by deserialization. + * + * @param icebergDataType string representation of Iceberg data type + * @param repetition repetition of the Parquet data type + * @param id column id + * @param name column name + * @return Iceberg data type + */ + public static org.apache.parquet.schema.Type parseIcebergDataTypeStringToParquetType( + String icebergDataType, + org.apache.parquet.schema.Type.Repetition repetition, + int id, + String name) { + Type icebergType = deserializeIcebergType(icebergDataType); + if (!icebergType.isPrimitiveType()) { + throw new IllegalArgumentException( + String.format("Snowflake supports only primitive Iceberg types, got '%s'", icebergType)); + } + return typeToMessageType.primitive(icebergType.asPrimitiveType(), repetition, id, name); + } + + /** + * Get Iceberg data type information by deserialization. + * + * @param icebergDataType string representation of Iceberg data type + * @return Iceberg data type + */ + public static Type deserializeIcebergType(String icebergDataType) { + try { + JsonNode json = MAPPER.readTree(icebergDataType); + return getTypeFromJson(json); + } catch (JsonProcessingException e) { + throw new IllegalArgumentException( + String.format("Failed to deserialize Iceberg data type: %s", icebergDataType)); + } + } + + /** + * Get corresponding Iceberg data type from JsonNode. + * + * @param jsonNode JsonNode parsed from Iceberg type string. + * @return Iceberg data type + */ + public static Type getTypeFromJson(@Nonnull JsonNode jsonNode) { + if (jsonNode.isTextual()) { + return Types.fromPrimitiveString(jsonNode.asText()); + } else if (jsonNode.isObject()) { + if (!jsonNode.has(TYPE)) { + throw new IllegalArgumentException( + String.format("Missing key '%s' in schema: %s", TYPE, jsonNode)); + } + String type = jsonNode.get(TYPE).asText(); + if (STRUCT.equals(type)) { + return structFromJson(jsonNode); + } else if (LIST.equals(type)) { + return listFromJson(jsonNode); + } else if (MAP.equals(type)) { + return mapFromJson(jsonNode); + } + throw new IllegalArgumentException( + String.format("Cannot parse Iceberg type: %s, schema: %s", type, jsonNode)); + } + + throw new IllegalArgumentException("Cannot parse Iceberg type from schema: " + jsonNode); + } + + /** + * Get Iceberg struct type information from JsonNode. + * + * @param json JsonNode parsed from Iceberg type string. + * @return struct type + */ + public static @Nonnull Types.StructType structFromJson(@Nonnull JsonNode json) { + if (!json.has(FIELDS)) { + throw new IllegalArgumentException( + String.format("Missing key '%s' in schema: %s", FIELDS, json)); + } + JsonNode fieldArray = json.get(FIELDS); + Preconditions.checkArgument(fieldArray != null, "Field array cannot be null"); + Preconditions.checkArgument( + fieldArray.isArray(), "Cannot parse struct fields from non-array: %s", fieldArray); + + List fields = Lists.newArrayListWithExpectedSize(fieldArray.size()); + Iterator iterator = fieldArray.elements(); + while (iterator.hasNext()) { + JsonNode field = iterator.next(); + Preconditions.checkArgument( + field.isObject(), "Cannot parse struct field from non-object: %s", field); + + int id = JsonUtil.getInt(ID, field); + String name = JsonUtil.getString(NAME, field); + Type type = getTypeFromJson(field.get(TYPE)); + + String doc = JsonUtil.getStringOrNull(DOC, field); + boolean isRequired = JsonUtil.getBool(REQUIRED, field); + if (isRequired) { + fields.add(Types.NestedField.required(id, name, type, doc)); + } else { + fields.add(Types.NestedField.optional(id, name, type, doc)); + } + } + + return Types.StructType.of(fields); + } + + /** + * Get Iceberg list type information from JsonNode. + * + * @param json JsonNode parsed from Iceberg type string. + * @return list type + */ + public static Types.ListType listFromJson(JsonNode json) { + int elementId = JsonUtil.getInt(ELEMENT_ID, json); + Type elementType = getTypeFromJson(json.get(ELEMENT)); + boolean isRequired = JsonUtil.getBool(ELEMENT_REQUIRED, json); + + if (isRequired) { + return Types.ListType.ofRequired(elementId, elementType); + } else { + return Types.ListType.ofOptional(elementId, elementType); + } + } + + /** + * Get Iceberg map type from JsonNode. + * + * @param json JsonNode parsed from Iceberg type string. + * @return map type + */ + public static Types.MapType mapFromJson(JsonNode json) { + int keyId = JsonUtil.getInt(KEY_ID, json); + Type keyType = getTypeFromJson(json.get(KEY)); + + int valueId = JsonUtil.getInt(VALUE_ID, json); + Type valueType = getTypeFromJson(json.get(VALUE)); + + boolean isRequired = JsonUtil.getBool(VALUE_REQUIRED, json); + + if (isRequired) { + return Types.MapType.ofRequired(keyId, valueId, keyType, valueType); + } else { + return Types.MapType.ofOptional(keyId, valueId, keyType, valueType); + } + } +} diff --git a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java index 885314b01..3d04aa1b2 100644 --- a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java +++ b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java @@ -79,6 +79,8 @@ public class ParameterProvider { public static final boolean ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT = true; + public static final boolean IS_ICEBERG_MODE_DEFAULT = false; + /** Map of parameter name to parameter value. This will be set by client/configure API Call. */ private final Map parameterMap = new HashMap<>(); diff --git a/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java b/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java index 58e7df4f3..eb5c20f03 100644 --- a/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java +++ b/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2022-2024 Snowflake Computing Inc. All rights reserved. */ package org.apache.parquet.hadoop; @@ -25,8 +25,10 @@ import org.apache.parquet.io.PositionOutputStream; import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.RecordConsumer; +import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; /** * BDEC specific parquet writer. @@ -278,51 +280,58 @@ public void write(List values) { + ") : " + values); } - recordConsumer.startMessage(); + writeValues(values, schema); + recordConsumer.endMessage(); + } + + private void writeValues(List values, GroupType type) { + List cols = type.getFields(); for (int i = 0; i < cols.size(); ++i) { Object val = values.get(i); - // val.length() == 0 indicates a NULL value. if (val != null) { - String fieldName = cols.get(i).getPath()[0]; + String fieldName = cols.get(i).getName(); recordConsumer.startField(fieldName, i); - PrimitiveType.PrimitiveTypeName typeName = - cols.get(i).getPrimitiveType().getPrimitiveTypeName(); - switch (typeName) { - case BOOLEAN: - recordConsumer.addBoolean((boolean) val); - break; - case FLOAT: - recordConsumer.addFloat((float) val); - break; - case DOUBLE: - recordConsumer.addDouble((double) val); - break; - case INT32: - recordConsumer.addInteger((int) val); - break; - case INT64: - recordConsumer.addLong((long) val); - break; - case BINARY: - Binary binVal = - val instanceof String - ? Binary.fromString((String) val) - : Binary.fromConstantByteArray((byte[]) val); - recordConsumer.addBinary(binVal); - break; - case FIXED_LEN_BYTE_ARRAY: - Binary binary = Binary.fromConstantByteArray((byte[]) val); - recordConsumer.addBinary(binary); - break; - default: - throw new ParquetEncodingException( - "Unsupported column type: " + cols.get(i).getPrimitiveType()); + if (cols.get(i).isPrimitive()) { + PrimitiveType.PrimitiveTypeName typeName = + cols.get(i).asPrimitiveType().getPrimitiveTypeName(); + switch (typeName) { + case BOOLEAN: + recordConsumer.addBoolean((boolean) val); + break; + case FLOAT: + recordConsumer.addFloat((float) val); + break; + case DOUBLE: + recordConsumer.addDouble((double) val); + break; + case INT32: + recordConsumer.addInteger((int) val); + break; + case INT64: + recordConsumer.addLong((long) val); + break; + case BINARY: + Binary binVal = + val instanceof String + ? Binary.fromString((String) val) + : Binary.fromConstantByteArray((byte[]) val); + recordConsumer.addBinary(binVal); + break; + case FIXED_LEN_BYTE_ARRAY: + Binary binary = Binary.fromConstantByteArray((byte[]) val); + recordConsumer.addBinary(binary); + break; + default: + throw new ParquetEncodingException( + "Unsupported column type: " + cols.get(i).asPrimitiveType()); + } + } else { + throw new ParquetEncodingException("Unsupported column type: " + cols.get(i)); } recordConsumer.endField(fieldName, i); } } - recordConsumer.endMessage(); } } } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java index 62a6b7a4b..1cc3a2953 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java @@ -96,7 +96,7 @@ private List> createChannelDataPerTable(int metada } private static MessageType createSchema(String columnName) { - ParquetTypeGenerator.ParquetTypeInfo c1 = + ParquetTypeInfo c1 = ParquetTypeGenerator.generateColumnParquetTypeInfo(createTestTextColumn(columnName), 1); return new MessageType("bdec", Collections.singletonList(c1.getParquetType())); } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ColumnMetadataBuilder.java b/src/test/java/net/snowflake/ingest/streaming/internal/ColumnMetadataBuilder.java index 01fd44918..4ad62f0f7 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ColumnMetadataBuilder.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ColumnMetadataBuilder.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2022-2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.streaming.internal; @@ -16,6 +16,7 @@ public class ColumnMetadataBuilder { private Integer length; private boolean nullable; private String collation; + private String sourceIcebergDataType; private Integer ordinal; @@ -144,6 +145,17 @@ public ColumnMetadataBuilder collation(String collation) { return this; } + /** + * Set column source Iceberg data type + * + * @param sourceIcebergDataType source Iceberg data type string + * @return columnMetadataBuilder object + */ + public ColumnMetadataBuilder sourceIcebergDataType(String sourceIcebergDataType) { + this.sourceIcebergDataType = sourceIcebergDataType; + return this; + } + /** * Set column ordinal * @@ -172,6 +184,7 @@ public ColumnMetadata build() { colMetadata.setScale(scale); colMetadata.setPrecision(precision); colMetadata.setCollation(collation); + colMetadata.setSourceIcebergDataType(sourceIcebergDataType); colMetadata.setOrdinal(ordinal); return colMetadata; } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/DataValidationUtilTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/DataValidationUtilTest.java index b92cc6e5e..0e738a4b3 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/DataValidationUtilTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/DataValidationUtilTest.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal; import static java.time.ZoneOffset.UTC; @@ -11,6 +15,8 @@ import static net.snowflake.ingest.streaming.internal.DataValidationUtil.validateAndParseBinary; import static net.snowflake.ingest.streaming.internal.DataValidationUtil.validateAndParseBoolean; import static net.snowflake.ingest.streaming.internal.DataValidationUtil.validateAndParseDate; +import static net.snowflake.ingest.streaming.internal.DataValidationUtil.validateAndParseIcebergInt; +import static net.snowflake.ingest.streaming.internal.DataValidationUtil.validateAndParseIcebergLong; import static net.snowflake.ingest.streaming.internal.DataValidationUtil.validateAndParseObject; import static net.snowflake.ingest.streaming.internal.DataValidationUtil.validateAndParseObjectNew; import static net.snowflake.ingest.streaming.internal.DataValidationUtil.validateAndParseReal; @@ -1190,6 +1196,91 @@ public void testValidateAndParseBoolean() { expectError(ErrorCode.INVALID_VALUE_ROW, () -> validateAndParseBoolean("COL", "", 0)); } + @Test + public void testValidateAndParseIcebergInt() { + assertEquals(1, validateAndParseIcebergInt("COL", 1, 0)); + assertEquals(1, validateAndParseIcebergInt("COL", 1L, 0)); + assertEquals(1, validateAndParseIcebergInt("COL", 1.499f, 0)); + assertEquals(0, validateAndParseIcebergLong("COL", -.0f, 0)); + assertEquals(1, validateAndParseIcebergInt("COL", 1.0d, 0)); + assertEquals(1, validateAndParseIcebergInt("COL", "1", 0)); + assertEquals(1, validateAndParseIcebergInt("COL", " 1 \t\n", 0)); + assertEquals(1, validateAndParseIcebergInt("COL", "0.5", 0)); + assertEquals(1, validateAndParseIcebergInt("COL", "1.0e0", 0)); + assertEquals(1, validateAndParseIcebergInt("COL", "1.0e+0", 0)); + assertEquals(1, validateAndParseIcebergInt("COL", "1.0e-0", 0)); + assertEquals(10, validateAndParseIcebergInt("COL", "1.0e1", 0)); + assertEquals(10, validateAndParseIcebergInt("COL", "1.0e+1", 0)); + assertEquals(0, validateAndParseIcebergInt("COL", "1.0e-1", 0)); + assertEquals(1, validateAndParseIcebergInt("COL", new BigDecimal("1.4"), 0)); + assertEquals(Integer.MAX_VALUE, validateAndParseIcebergInt("COL", "2147483647.499", 0)); + assertEquals(Integer.MIN_VALUE, validateAndParseIcebergInt("COL", "-2147483648.499", 0)); + + // Test forbidden values + expectError(ErrorCode.INVALID_FORMAT_ROW, () -> validateAndParseIcebergInt("COL", 'c', 0)); + expectError( + ErrorCode.INVALID_FORMAT_ROW, () -> validateAndParseIcebergInt("COL", new Object(), 0)); + expectError( + ErrorCode.INVALID_FORMAT_ROW, () -> validateAndParseIcebergInt("COL", new int[] {}, 0)); + expectError(ErrorCode.INVALID_VALUE_ROW, () -> validateAndParseIcebergInt("COL", "foo", 0)); + expectError( + ErrorCode.INVALID_VALUE_ROW, () -> validateAndParseIcebergInt("COL", "2147483647.5", 0)); + expectError( + ErrorCode.INVALID_VALUE_ROW, + () -> validateAndParseIcebergInt("COL", new BigDecimal("-2147483648.5"), 0)); + expectError( + ErrorCode.INVALID_VALUE_ROW, () -> validateAndParseIcebergInt("COL", Double.NaN, 0)); + expectError( + ErrorCode.INVALID_VALUE_ROW, + () -> validateAndParseIcebergInt("COL", Double.POSITIVE_INFINITY, 0)); + expectError( + ErrorCode.INVALID_VALUE_ROW, + () -> validateAndParseIcebergInt("COL", Float.NEGATIVE_INFINITY, 0)); + } + + @Test + public void testValidateAndParseIcebergLong() { + assertEquals(1L, validateAndParseIcebergLong("COL", 1, 0)); + assertEquals(1L, validateAndParseIcebergLong("COL", 1L, 0)); + assertEquals(1L, validateAndParseIcebergLong("COL", 1.499f, 0)); + assertEquals(0, validateAndParseIcebergLong("COL", -.0f, 0)); + assertEquals(1L, validateAndParseIcebergLong("COL", 1.0d, 0)); + assertEquals(1L, validateAndParseIcebergLong("COL", "1", 0)); + assertEquals(1L, validateAndParseIcebergLong("COL", " 1 \t\n", 0)); + assertEquals(1L, validateAndParseIcebergLong("COL", "0.5", 0)); + assertEquals(1L, validateAndParseIcebergLong("COL", "1.0e0", 0)); + assertEquals(1L, validateAndParseIcebergLong("COL", "1.0e+0", 0)); + assertEquals(1L, validateAndParseIcebergLong("COL", "1.0e-0", 0)); + assertEquals(10L, validateAndParseIcebergLong("COL", "1.0e1", 0)); + assertEquals(10L, validateAndParseIcebergLong("COL", "1.0e+1", 0)); + assertEquals(0L, validateAndParseIcebergLong("COL", "1.0e-1", 0)); + assertEquals(1L, validateAndParseIcebergLong("COL", new BigDecimal("1.4"), 0)); + assertEquals(Long.MAX_VALUE, validateAndParseIcebergLong("COL", "9223372036854775807.499", 0)); + assertEquals(Long.MIN_VALUE, validateAndParseIcebergLong("COL", "-9223372036854775808.499", 0)); + + // Test forbidden values + expectError(ErrorCode.INVALID_FORMAT_ROW, () -> validateAndParseIcebergLong("COL", 'c', 0)); + expectError( + ErrorCode.INVALID_FORMAT_ROW, () -> validateAndParseIcebergLong("COL", new Object(), 0)); + expectError( + ErrorCode.INVALID_FORMAT_ROW, () -> validateAndParseIcebergLong("COL", new int[] {}, 0)); + expectError(ErrorCode.INVALID_VALUE_ROW, () -> validateAndParseIcebergLong("COL", "foo", 0)); + expectError( + ErrorCode.INVALID_VALUE_ROW, + () -> validateAndParseIcebergLong("COL", "9223372036854775807.5", 0)); + expectError( + ErrorCode.INVALID_VALUE_ROW, + () -> validateAndParseIcebergLong("COL", new BigDecimal("-9223372036854775808.5"), 0)); + expectError( + ErrorCode.INVALID_VALUE_ROW, () -> validateAndParseIcebergLong("COL", Float.NaN, 0)); + expectError( + ErrorCode.INVALID_VALUE_ROW, + () -> validateAndParseIcebergLong("COL", Float.POSITIVE_INFINITY, 0)); + expectError( + ErrorCode.INVALID_VALUE_ROW, + () -> validateAndParseIcebergLong("COL", Double.NEGATIVE_INFINITY, 0)); + } + /** * Tests that exception message are constructed correctly when ingesting forbidden Java type, as * well a value of an allowed type, but in invalid format diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/IcebergDataTypeParserTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/IcebergDataTypeParserTest.java new file mode 100644 index 000000000..e12d2f4f7 --- /dev/null +++ b/src/test/java/net/snowflake/ingest/streaming/internal/IcebergDataTypeParserTest.java @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal; + +import java.util.ArrayList; +import java.util.List; +import net.snowflake.ingest.utils.IcebergDataTypeParser; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** Test for Iceberg data type serialization and deserialization. */ +public class IcebergDataTypeParserTest { + private class DataTypeInfo { + + // Json representation of Iceberg data type + String jsonStr; + Type icebergType; + + DataTypeInfo(String jsonStr, Type icebergType) { + this.jsonStr = jsonStr; + this.icebergType = icebergType; + } + } + + private int fieldId = 0; + + List dataTypesToTest; + + @Before + public void setup() { + fieldId = 0; + + // Create a Iceberg data type information list with primitive types and nested types. + dataTypesToTest = new ArrayList<>(); + dataTypesToTest.add(new DataTypeInfo("\"boolean\"", Types.BooleanType.get())); + dataTypesToTest.add(new DataTypeInfo("\"int\"", Types.IntegerType.get())); + dataTypesToTest.add(new DataTypeInfo("\"long\"", Types.LongType.get())); + dataTypesToTest.add(new DataTypeInfo("\"float\"", Types.FloatType.get())); + dataTypesToTest.add(new DataTypeInfo("\"double\"", Types.DoubleType.get())); + dataTypesToTest.add(new DataTypeInfo("\"string\"", Types.StringType.get())); + dataTypesToTest.add(new DataTypeInfo("\"date\"", Types.DateType.get())); + dataTypesToTest.add(new DataTypeInfo("\"time\"", Types.TimeType.get())); + dataTypesToTest.add(new DataTypeInfo("\"timestamptz\"", Types.TimestampType.withZone())); + dataTypesToTest.add( + new DataTypeInfo( + "{\"type\":\"struct\",\"fields\":[{\"id\":1,\"name\":\"first\",\"required\":false,\"type\":\"int\"},{\"id\":2,\"name\":\"second\",\"required\":false,\"type\":\"int\"}]}", + Types.StructType.of( + Types.NestedField.optional(generateFieldId(), "first", Types.IntegerType.get()), + Types.NestedField.optional(generateFieldId(), "second", Types.IntegerType.get())))); + dataTypesToTest.add( + new DataTypeInfo( + "{\"type\":\"list\",\"element-id\":3,\"element\":\"int\",\"element-required\":false}", + Types.ListType.ofOptional(generateFieldId(), Types.IntegerType.get()))); + dataTypesToTest.add( + new DataTypeInfo( + "{\"type\":\"map\",\"key-id\":4,\"key\":\"int\",\"value-id\":5,\"value\":\"string\",\"value-required\":false}", + Types.MapType.ofOptional( + generateFieldId(), + generateFieldId(), + Types.IntegerType.get(), + Types.StringType.get()))); + } + + /** Helper function to generate a unique fieldId for nested types */ + private int generateFieldId() { + fieldId++; + return fieldId; + } + + /** Test for Iceberg data type deserialization. */ + @Test + public void testDeserializeIcebergType() { + for (int i = 0; i < dataTypesToTest.size(); i++) { + DataTypeInfo typeInfo = dataTypesToTest.get(i); + Type dataType = IcebergDataTypeParser.deserializeIcebergType(typeInfo.jsonStr); + Assert.assertEquals(typeInfo.icebergType, dataType); + } + } + + @Test + public void testDeserializeIcebergTypeFailed() { + String json = "bad json"; + IllegalArgumentException exception = + Assert.assertThrows( + IllegalArgumentException.class, + () -> IcebergDataTypeParser.deserializeIcebergType(json)); + Assert.assertEquals( + "Failed to deserialize Iceberg data type: bad json", exception.getMessage()); + } + + @Test + public void testUnsupportedIcebergType() { + String json = "{\"type\":\"unsupported\"}"; + IllegalArgumentException exception = + Assert.assertThrows( + IllegalArgumentException.class, + () -> IcebergDataTypeParser.deserializeIcebergType(json)); + Assert.assertEquals( + "Cannot parse Iceberg type: unsupported, schema: {\"type\":\"unsupported\"}", + exception.getMessage()); + } +} diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParserTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParserTest.java new file mode 100644 index 000000000..a0b4caa1c --- /dev/null +++ b/src/test/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParserTest.java @@ -0,0 +1,325 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal; + +import static java.time.ZoneOffset.UTC; +import static net.snowflake.ingest.streaming.internal.ParquetBufferValue.BYTE_ARRAY_LENGTH_ENCODING_BYTE_LEN; +import static net.snowflake.ingest.streaming.internal.ParquetBufferValue.DEFINITION_LEVEL_ENCODING_BYTE_LEN; + +import java.math.BigDecimal; +import java.math.BigInteger; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Type.Repetition; +import org.apache.parquet.schema.Types; +import org.junit.Test; + +public class IcebergParquetValueParserTest { + + @Test + public void parseValueBoolean() { + Type type = + Types.primitive(PrimitiveTypeName.BOOLEAN, Repetition.OPTIONAL).named("BOOLEAN_COL"); + + RowBufferStats rowBufferStats = new RowBufferStats("BOOLEAN_COL"); + ParquetBufferValue pv = + IcebergParquetValueParser.parseColumnValueToParquet(true, type, rowBufferStats, UTC, 0); + ParquetValueParserAssertionBuilder.newBuilder() + .parquetBufferValue(pv) + .rowBufferStats(rowBufferStats) + .expectedValueClass(Boolean.class) + .expectedParsedValue(true) + .expectedSize(ParquetBufferValue.BIT_ENCODING_BYTE_LEN + DEFINITION_LEVEL_ENCODING_BYTE_LEN) + .expectedMinMax(BigInteger.valueOf(1)) + .assertMatches(); + } + + @Test + public void parseValueInt() { + Type type = Types.primitive(PrimitiveTypeName.INT32, Repetition.OPTIONAL).named("INT_COL"); + + RowBufferStats rowBufferStats = new RowBufferStats("INT_COL"); + ParquetBufferValue pv = + IcebergParquetValueParser.parseColumnValueToParquet( + Integer.MAX_VALUE, type, rowBufferStats, UTC, 0); + ParquetValueParserAssertionBuilder.newBuilder() + .parquetBufferValue(pv) + .rowBufferStats(rowBufferStats) + .expectedValueClass(Integer.class) + .expectedParsedValue(Integer.MAX_VALUE) + .expectedSize(4.0f + DEFINITION_LEVEL_ENCODING_BYTE_LEN) + .expectedMinMax(BigInteger.valueOf(Integer.MAX_VALUE)) + .assertMatches(); + } + + @Test + public void parseValueDecimalToInt() { + Type type = + Types.primitive(PrimitiveTypeName.INT32, Repetition.OPTIONAL) + .as(LogicalTypeAnnotation.decimalType(4, 9)) + .named("DECIMAL_COL"); + + RowBufferStats rowBufferStats = new RowBufferStats("DECIMAL_COL"); + ParquetBufferValue pv = + IcebergParquetValueParser.parseColumnValueToParquet( + new BigDecimal("12345.6789"), type, rowBufferStats, UTC, 0); + ParquetValueParserAssertionBuilder.newBuilder() + .parquetBufferValue(pv) + .rowBufferStats(rowBufferStats) + .expectedValueClass(Integer.class) + .expectedParsedValue(123456789) + .expectedSize(4.0f + DEFINITION_LEVEL_ENCODING_BYTE_LEN) + .expectedMinMax(BigInteger.valueOf(123456789)) + .assertMatches(); + } + + @Test + public void parseValueDateToInt() { + Type type = + Types.primitive(PrimitiveTypeName.INT32, Repetition.OPTIONAL) + .as(LogicalTypeAnnotation.dateType()) + .named("DATE_COL"); + + RowBufferStats rowBufferStats = new RowBufferStats("DATE_COL"); + ParquetBufferValue pv = + IcebergParquetValueParser.parseColumnValueToParquet( + "2024-01-01", type, rowBufferStats, UTC, 0); + ParquetValueParserAssertionBuilder.newBuilder() + .parquetBufferValue(pv) + .rowBufferStats(rowBufferStats) + .expectedValueClass(Integer.class) + .expectedParsedValue(19723) + .expectedSize(4.0f + DEFINITION_LEVEL_ENCODING_BYTE_LEN) + .expectedMinMax(BigInteger.valueOf(19723)) + .assertMatches(); + } + + @Test + public void parseValueLong() { + Type type = Types.primitive(PrimitiveTypeName.INT64, Repetition.OPTIONAL).named("LONG_COL"); + + RowBufferStats rowBufferStats = new RowBufferStats("LONG_COL"); + ParquetBufferValue pv = + IcebergParquetValueParser.parseColumnValueToParquet( + Long.MAX_VALUE, type, rowBufferStats, UTC, 0); + ParquetValueParserAssertionBuilder.newBuilder() + .parquetBufferValue(pv) + .rowBufferStats(rowBufferStats) + .expectedValueClass(Long.class) + .expectedParsedValue(Long.MAX_VALUE) + .expectedSize(8.0f + DEFINITION_LEVEL_ENCODING_BYTE_LEN) + .expectedMinMax(BigInteger.valueOf(Long.MAX_VALUE)) + .assertMatches(); + } + + @Test + public void parseValueDecimalToLong() { + Type type = + Types.primitive(PrimitiveTypeName.INT64, Repetition.OPTIONAL) + .as(LogicalTypeAnnotation.decimalType(9, 18)) + .named("DECIMAL_COL"); + + RowBufferStats rowBufferStats = new RowBufferStats("DECIMAL_COL"); + ParquetBufferValue pv = + IcebergParquetValueParser.parseColumnValueToParquet( + new BigDecimal("123456789.123456789"), type, rowBufferStats, UTC, 0); + ParquetValueParserAssertionBuilder.newBuilder() + .parquetBufferValue(pv) + .rowBufferStats(rowBufferStats) + .expectedValueClass(Long.class) + .expectedParsedValue(123456789123456789L) + .expectedSize(8.0f + DEFINITION_LEVEL_ENCODING_BYTE_LEN) + .expectedMinMax(BigInteger.valueOf(123456789123456789L)) + .assertMatches(); + } + + @Test + public void parseValueTimeToLong() { + Type type = + Types.primitive(PrimitiveTypeName.INT64, Repetition.OPTIONAL) + .as(LogicalTypeAnnotation.timeType(false, LogicalTypeAnnotation.TimeUnit.MICROS)) + .named("TIME_COL"); + + RowBufferStats rowBufferStats = new RowBufferStats("TIME_COL"); + ParquetBufferValue pv = + IcebergParquetValueParser.parseColumnValueToParquet( + "12:34:56.789", type, rowBufferStats, UTC, 0); + ParquetValueParserAssertionBuilder.newBuilder() + .parquetBufferValue(pv) + .rowBufferStats(rowBufferStats) + .expectedValueClass(Long.class) + .expectedParsedValue(45296789000L) + .expectedSize(8.0f + DEFINITION_LEVEL_ENCODING_BYTE_LEN) + .expectedMinMax(BigInteger.valueOf(45296789000L)) + .assertMatches(); + } + + @Test + public void parseValueTimestampToLong() { + Type type = + Types.primitive(PrimitiveTypeName.INT64, Repetition.OPTIONAL) + .as(LogicalTypeAnnotation.timestampType(false, LogicalTypeAnnotation.TimeUnit.MICROS)) + .named("TIMESTAMP_COL"); + + RowBufferStats rowBufferStats = new RowBufferStats("TIMESTAMP_COL"); + ParquetBufferValue pv = + IcebergParquetValueParser.parseColumnValueToParquet( + "2024-01-01T12:34:56.789+08:00", type, rowBufferStats, UTC, 0); + ParquetValueParserAssertionBuilder.newBuilder() + .parquetBufferValue(pv) + .rowBufferStats(rowBufferStats) + .expectedValueClass(Long.class) + .expectedParsedValue(1704112496789000L) + .expectedSize(8.0f + DEFINITION_LEVEL_ENCODING_BYTE_LEN) + .expectedMinMax(BigInteger.valueOf(1704112496789000L)) + .assertMatches(); + } + + @Test + public void parseValueTimestampTZToLong() { + Type type = + Types.primitive(PrimitiveTypeName.INT64, Repetition.OPTIONAL) + .as(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MICROS)) + .named("TIMESTAMP_TZ_COL"); + + RowBufferStats rowBufferStats = new RowBufferStats("TIMESTAMP_TZ_COL"); + ParquetBufferValue pv = + IcebergParquetValueParser.parseColumnValueToParquet( + "2024-01-01T12:34:56.789+08:00", type, rowBufferStats, UTC, 0); + ParquetValueParserAssertionBuilder.newBuilder() + .parquetBufferValue(pv) + .rowBufferStats(rowBufferStats) + .expectedValueClass(Long.class) + .expectedParsedValue(1704083696789000L) + .expectedSize(8.0f + DEFINITION_LEVEL_ENCODING_BYTE_LEN) + .expectedMinMax(BigInteger.valueOf(1704083696789000L)) + .assertMatches(); + } + + @Test + public void parseValueFloat() { + Type type = Types.primitive(PrimitiveTypeName.FLOAT, Repetition.OPTIONAL).named("FLOAT_COL"); + + RowBufferStats rowBufferStats = new RowBufferStats("FLOAT_COL"); + ParquetBufferValue pv = + IcebergParquetValueParser.parseColumnValueToParquet( + Float.MAX_VALUE, type, rowBufferStats, UTC, 0); + ParquetValueParserAssertionBuilder.newBuilder() + .parquetBufferValue(pv) + .rowBufferStats(rowBufferStats) + .expectedValueClass(Float.class) + .expectedParsedValue(Float.MAX_VALUE) + .expectedSize(4.0f + DEFINITION_LEVEL_ENCODING_BYTE_LEN) + .expectedMinMax((double) Float.MAX_VALUE) + .assertMatches(); + } + + @Test + public void parseValueDouble() { + Type type = Types.primitive(PrimitiveTypeName.DOUBLE, Repetition.OPTIONAL).named("DOUBLE_COL"); + + RowBufferStats rowBufferStats = new RowBufferStats("DOUBLE_COL"); + ParquetBufferValue pv = + IcebergParquetValueParser.parseColumnValueToParquet( + Double.MAX_VALUE, type, rowBufferStats, UTC, 0); + ParquetValueParserAssertionBuilder.newBuilder() + .parquetBufferValue(pv) + .rowBufferStats(rowBufferStats) + .expectedValueClass(Double.class) + .expectedParsedValue(Double.MAX_VALUE) + .expectedSize(8.0f + DEFINITION_LEVEL_ENCODING_BYTE_LEN) + .expectedMinMax(Double.MAX_VALUE) + .assertMatches(); + } + + @Test + public void parseValueBinary() { + Type type = Types.primitive(PrimitiveTypeName.BINARY, Repetition.OPTIONAL).named("BINARY_COL"); + + RowBufferStats rowBufferStats = new RowBufferStats("BINARY_COL"); + byte[] value = "snowflake_to_the_moon".getBytes(); + ParquetBufferValue pv = + IcebergParquetValueParser.parseColumnValueToParquet(value, type, rowBufferStats, UTC, 0); + ParquetValueParserAssertionBuilder.newBuilder() + .parquetBufferValue(pv) + .rowBufferStats(rowBufferStats) + .expectedValueClass(byte[].class) + .expectedParsedValue(value) + .expectedSize( + value.length + DEFINITION_LEVEL_ENCODING_BYTE_LEN + BYTE_ARRAY_LENGTH_ENCODING_BYTE_LEN) + .expectedMinMax(value) + .assertMatches(); + } + + @Test + public void parseValueStringToBinary() { + Type type = + Types.primitive(PrimitiveTypeName.BINARY, Repetition.OPTIONAL) + .as(LogicalTypeAnnotation.stringType()) + .named("BINARY_COL"); + + RowBufferStats rowBufferStats = new RowBufferStats("BINARY_COL"); + String value = "snowflake_to_the_moon"; + ParquetBufferValue pv = + IcebergParquetValueParser.parseColumnValueToParquet(value, type, rowBufferStats, UTC, 0); + ParquetValueParserAssertionBuilder.newBuilder() + .parquetBufferValue(pv) + .rowBufferStats(rowBufferStats) + .expectedValueClass(byte[].class) + .expectedParsedValue(value.getBytes()) + .expectedSize( + value.getBytes().length + + DEFINITION_LEVEL_ENCODING_BYTE_LEN + + BYTE_ARRAY_LENGTH_ENCODING_BYTE_LEN) + .expectedMinMax(value.getBytes()) + .assertMatches(); + } + + @Test + public void parseValueFixed() { + Type type = + Types.primitive(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, Repetition.OPTIONAL) + .length(4) + .named("FIXED_COL"); + + RowBufferStats rowBufferStats = new RowBufferStats("FIXED_COL"); + byte[] value = "snow".getBytes(); + ParquetBufferValue pv = + IcebergParquetValueParser.parseColumnValueToParquet(value, type, rowBufferStats, UTC, 0); + ParquetValueParserAssertionBuilder.newBuilder() + .parquetBufferValue(pv) + .rowBufferStats(rowBufferStats) + .expectedValueClass(byte[].class) + .expectedParsedValue(value) + .expectedSize( + 4.0f + DEFINITION_LEVEL_ENCODING_BYTE_LEN + BYTE_ARRAY_LENGTH_ENCODING_BYTE_LEN) + .expectedMinMax(value) + .assertMatches(); + } + + @Test + public void parseValueDecimalToFixed() { + Type type = + Types.primitive(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, Repetition.OPTIONAL) + .length(9) + .as(LogicalTypeAnnotation.decimalType(10, 20)) + .named("FIXED_COL"); + + RowBufferStats rowBufferStats = new RowBufferStats("FIXED_COL"); + BigDecimal value = new BigDecimal("1234567890.0123456789"); + ParquetBufferValue pv = + IcebergParquetValueParser.parseColumnValueToParquet(value, type, rowBufferStats, UTC, 0); + ParquetValueParserAssertionBuilder.newBuilder() + .parquetBufferValue(pv) + .rowBufferStats(rowBufferStats) + .expectedValueClass(byte[].class) + .expectedParsedValue(value.unscaledValue().toByteArray()) + .expectedSize( + 9.0f + DEFINITION_LEVEL_ENCODING_BYTE_LEN + BYTE_ARRAY_LENGTH_ENCODING_BYTE_LEN) + .expectedMinMax(value.unscaledValue()) + .assertMatches(); + } +} diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ParquetTypeGeneratorTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ParquetTypeGeneratorTest.java index 3356ab277..c83d339c1 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ParquetTypeGeneratorTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ParquetTypeGeneratorTest.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal; import java.util.Map; @@ -19,8 +23,7 @@ public void buildFieldFixedSB1() { .nullable(true) .build(); - ParquetTypeGenerator.ParquetTypeInfo typeInfo = - ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); createParquetTypeInfoAssertionBuilder() .typeInfo(typeInfo) .expectedFieldName("TESTCOL") @@ -45,8 +48,7 @@ public void buildFieldFixedSB2() { .nullable(false) .build(); - ParquetTypeGenerator.ParquetTypeInfo typeInfo = - ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); createParquetTypeInfoAssertionBuilder() .typeInfo(typeInfo) .expectedFieldName("TESTCOL") @@ -71,8 +73,7 @@ public void buildFieldFixedSB4() { .nullable(true) .build(); - ParquetTypeGenerator.ParquetTypeInfo typeInfo = - ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); createParquetTypeInfoAssertionBuilder() .typeInfo(typeInfo) .expectedFieldName("TESTCOL") @@ -97,8 +98,7 @@ public void buildFieldFixedSB8() { .nullable(true) .build(); - ParquetTypeGenerator.ParquetTypeInfo typeInfo = - ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); createParquetTypeInfoAssertionBuilder() .typeInfo(typeInfo) .expectedFieldName("TESTCOL") @@ -123,8 +123,7 @@ public void buildFieldFixedSB16() { .nullable(true) .build(); - ParquetTypeGenerator.ParquetTypeInfo typeInfo = - ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); createParquetTypeInfoAssertionBuilder() .typeInfo(typeInfo) .expectedFieldName("TESTCOL") @@ -149,8 +148,7 @@ public void buildFieldLobVariant() { .nullable(true) .build(); - ParquetTypeGenerator.ParquetTypeInfo typeInfo = - ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); createParquetTypeInfoAssertionBuilder() .typeInfo(typeInfo) .expectedFieldName("TESTCOL") @@ -176,8 +174,7 @@ public void buildFieldTimestampNtzSB8() { .nullable(true) .build(); - ParquetTypeGenerator.ParquetTypeInfo typeInfo = - ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); createParquetTypeInfoAssertionBuilder() .typeInfo(typeInfo) .expectedFieldName("TESTCOL") @@ -201,8 +198,7 @@ public void buildFieldTimestampNtzSB16() { .nullable(true) .build(); - ParquetTypeGenerator.ParquetTypeInfo typeInfo = - ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); createParquetTypeInfoAssertionBuilder() .typeInfo(typeInfo) .expectedFieldName("TESTCOL") @@ -226,8 +222,7 @@ public void buildFieldTimestampTzSB8() { .nullable(true) .build(); - ParquetTypeGenerator.ParquetTypeInfo typeInfo = - ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); createParquetTypeInfoAssertionBuilder() .typeInfo(typeInfo) .expectedFieldName("TESTCOL") @@ -251,8 +246,7 @@ public void buildFieldTimestampTzSB16() { .nullable(true) .build(); - ParquetTypeGenerator.ParquetTypeInfo typeInfo = - ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); createParquetTypeInfoAssertionBuilder() .typeInfo(typeInfo) .expectedFieldName("TESTCOL") @@ -276,8 +270,7 @@ public void buildFieldDate() { .nullable(true) .build(); - ParquetTypeGenerator.ParquetTypeInfo typeInfo = - ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); createParquetTypeInfoAssertionBuilder() .typeInfo(typeInfo) .expectedFieldName("TESTCOL") @@ -301,8 +294,7 @@ public void buildFieldTimeSB4() { .nullable(true) .build(); - ParquetTypeGenerator.ParquetTypeInfo typeInfo = - ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); createParquetTypeInfoAssertionBuilder() .typeInfo(typeInfo) .expectedFieldName("TESTCOL") @@ -326,8 +318,7 @@ public void buildFieldTimeSB8() { .nullable(true) .build(); - ParquetTypeGenerator.ParquetTypeInfo typeInfo = - ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); createParquetTypeInfoAssertionBuilder() .typeInfo(typeInfo) .expectedFieldName("TESTCOL") @@ -351,8 +342,7 @@ public void buildFieldBoolean() { .nullable(true) .build(); - ParquetTypeGenerator.ParquetTypeInfo typeInfo = - ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); createParquetTypeInfoAssertionBuilder() .typeInfo(typeInfo) .expectedFieldName("TESTCOL") @@ -376,8 +366,7 @@ public void buildFieldRealSB16() { .nullable(true) .build(); - ParquetTypeGenerator.ParquetTypeInfo typeInfo = - ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); createParquetTypeInfoAssertionBuilder() .typeInfo(typeInfo) .expectedFieldName("TESTCOL") @@ -392,23 +381,336 @@ public void buildFieldRealSB16() { .assertMatches(); } + @Test + public void buildFieldIcebergBoolean() { + ColumnMetadata testCol = + createColumnMetadataBuilder() + .logicalType("") + .sourceIcebergDataType("\"boolean\"") + .nullable(true) + .build(); + + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + createParquetTypeInfoAssertionBuilder() + .typeInfo(typeInfo) + .expectedFieldName("TESTCOL") + .expectedTypeLength(0) + .expectedPrimitiveTypeName(PrimitiveType.PrimitiveTypeName.BOOLEAN) + .expectedLogicalTypeAnnotation(null) + .expectedRepetition(Type.Repetition.OPTIONAL) + .expectedColMetaData(null) + .assertMatches(); + } + + @Test + public void buildFieldIcebergInt() { + ColumnMetadata testCol = + createColumnMetadataBuilder() + .logicalType("") + .sourceIcebergDataType("\"int\"") + .nullable(true) + .build(); + + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + createParquetTypeInfoAssertionBuilder() + .typeInfo(typeInfo) + .expectedFieldName("TESTCOL") + .expectedTypeLength(0) + .expectedPrimitiveTypeName(PrimitiveType.PrimitiveTypeName.INT32) + .expectedLogicalTypeAnnotation(null) + .expectedRepetition(Type.Repetition.OPTIONAL) + .expectedColMetaData(null) + .assertMatches(); + } + + @Test + public void buildFieldIcebergLong() { + ColumnMetadata testCol = + createColumnMetadataBuilder() + .logicalType("") + .sourceIcebergDataType("\"long\"") + .nullable(true) + .build(); + + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + createParquetTypeInfoAssertionBuilder() + .typeInfo(typeInfo) + .expectedFieldName("TESTCOL") + .expectedTypeLength(0) + .expectedPrimitiveTypeName(PrimitiveType.PrimitiveTypeName.INT64) + .expectedLogicalTypeAnnotation(null) + .expectedRepetition(Type.Repetition.OPTIONAL) + .expectedColMetaData(null) + .assertMatches(); + } + + @Test + public void buildFieldIcebergFloat() { + ColumnMetadata testCol = + createColumnMetadataBuilder() + .logicalType("") + .sourceIcebergDataType("\"float\"") + .nullable(true) + .build(); + + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + createParquetTypeInfoAssertionBuilder() + .typeInfo(typeInfo) + .expectedFieldName("TESTCOL") + .expectedTypeLength(0) + .expectedPrimitiveTypeName(PrimitiveType.PrimitiveTypeName.FLOAT) + .expectedLogicalTypeAnnotation(null) + .expectedRepetition(Type.Repetition.OPTIONAL) + .expectedColMetaData(null) + .assertMatches(); + } + + @Test + public void buildFieldIcebergDouble() { + ColumnMetadata testCol = + createColumnMetadataBuilder() + .logicalType("") + .sourceIcebergDataType("\"double\"") + .nullable(true) + .build(); + + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + createParquetTypeInfoAssertionBuilder() + .typeInfo(typeInfo) + .expectedFieldName("TESTCOL") + .expectedTypeLength(0) + .expectedPrimitiveTypeName(PrimitiveType.PrimitiveTypeName.DOUBLE) + .expectedLogicalTypeAnnotation(null) + .expectedRepetition(Type.Repetition.OPTIONAL) + .expectedColMetaData(null) + .assertMatches(); + } + + @Test + public void buildFieldIcebergDecimal() { + ColumnMetadata testCol = + createColumnMetadataBuilder() + .logicalType("") + .sourceIcebergDataType("\"decimal(9, 2)\"") + .nullable(true) + .build(); + + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + createParquetTypeInfoAssertionBuilder() + .typeInfo(typeInfo) + .expectedFieldName("TESTCOL") + .expectedTypeLength(0) + .expectedPrimitiveTypeName(PrimitiveType.PrimitiveTypeName.INT32) + .expectedLogicalTypeAnnotation(LogicalTypeAnnotation.decimalType(2, 9)) + .expectedRepetition(Type.Repetition.OPTIONAL) + .expectedColMetaData(null) + .assertMatches(); + + testCol = + createColumnMetadataBuilder() + .logicalType("") + .sourceIcebergDataType("\"decimal(10, 4)\"") + .nullable(true) + .build(); + + typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + createParquetTypeInfoAssertionBuilder() + .typeInfo(typeInfo) + .expectedFieldName("TESTCOL") + .expectedTypeLength(0) + .expectedPrimitiveTypeName(PrimitiveType.PrimitiveTypeName.INT64) + .expectedLogicalTypeAnnotation(LogicalTypeAnnotation.decimalType(4, 10)) + .expectedRepetition(Type.Repetition.OPTIONAL) + .expectedColMetaData(null) + .assertMatches(); + + testCol = + createColumnMetadataBuilder() + .logicalType("") + .sourceIcebergDataType("\"decimal(19, 1)\"") + .nullable(true) + .build(); + + typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + createParquetTypeInfoAssertionBuilder() + .typeInfo(typeInfo) + .expectedFieldName("TESTCOL") + .expectedTypeLength(9) + .expectedPrimitiveTypeName(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) + .expectedLogicalTypeAnnotation(LogicalTypeAnnotation.decimalType(1, 19)) + .expectedRepetition(Type.Repetition.OPTIONAL) + .expectedColMetaData(null) + .assertMatches(); + } + + @Test + public void buildFieldIcebergDate() { + ColumnMetadata testCol = + createColumnMetadataBuilder() + .logicalType("") + .sourceIcebergDataType("\"date\"") + .nullable(true) + .build(); + + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + createParquetTypeInfoAssertionBuilder() + .typeInfo(typeInfo) + .expectedFieldName("TESTCOL") + .expectedTypeLength(0) + .expectedPrimitiveTypeName(PrimitiveType.PrimitiveTypeName.INT32) + .expectedLogicalTypeAnnotation(LogicalTypeAnnotation.dateType()) + .expectedRepetition(Type.Repetition.OPTIONAL) + .expectedColMetaData(null) + .assertMatches(); + } + + @Test + public void buildFieldIcebergTime() { + ColumnMetadata testCol = + createColumnMetadataBuilder() + .logicalType("") + .sourceIcebergDataType("\"time\"") + .nullable(true) + .build(); + + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + createParquetTypeInfoAssertionBuilder() + .typeInfo(typeInfo) + .expectedFieldName("TESTCOL") + .expectedTypeLength(0) + .expectedPrimitiveTypeName(PrimitiveType.PrimitiveTypeName.INT64) + .expectedLogicalTypeAnnotation( + LogicalTypeAnnotation.timeType(false, LogicalTypeAnnotation.TimeUnit.MICROS)) + .expectedRepetition(Type.Repetition.OPTIONAL) + .expectedColMetaData(null) + .assertMatches(); + } + + @Test + public void buildFieldIcebergTimeStamp() { + ColumnMetadata testCol = + createColumnMetadataBuilder() + .logicalType("") + .sourceIcebergDataType("\"timestamp\"") + .nullable(true) + .build(); + + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + createParquetTypeInfoAssertionBuilder() + .typeInfo(typeInfo) + .expectedFieldName("TESTCOL") + .expectedTypeLength(0) + .expectedPrimitiveTypeName(PrimitiveType.PrimitiveTypeName.INT64) + .expectedLogicalTypeAnnotation( + LogicalTypeAnnotation.timestampType(false, LogicalTypeAnnotation.TimeUnit.MICROS)) + .expectedRepetition(Type.Repetition.OPTIONAL) + .expectedColMetaData(null) + .assertMatches(); + } + + @Test + public void buildFieldIcebergTimeStampTZ() { + ColumnMetadata testCol = + createColumnMetadataBuilder() + .logicalType("") + .sourceIcebergDataType("\"timestamptz\"") + .nullable(true) + .build(); + + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + createParquetTypeInfoAssertionBuilder() + .typeInfo(typeInfo) + .expectedFieldName("TESTCOL") + .expectedTypeLength(0) + .expectedPrimitiveTypeName(PrimitiveType.PrimitiveTypeName.INT64) + .expectedLogicalTypeAnnotation( + LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MICROS)) + .expectedRepetition(Type.Repetition.OPTIONAL) + .expectedColMetaData(null) + .assertMatches(); + } + + @Test + public void buildFieldIcebergString() { + ColumnMetadata testCol = + createColumnMetadataBuilder() + .logicalType("") + .sourceIcebergDataType("\"string\"") + .nullable(true) + .build(); + + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + createParquetTypeInfoAssertionBuilder() + .typeInfo(typeInfo) + .expectedFieldName("TESTCOL") + .expectedTypeLength(0) + .expectedPrimitiveTypeName(PrimitiveType.PrimitiveTypeName.BINARY) + .expectedLogicalTypeAnnotation(LogicalTypeAnnotation.stringType()) + .expectedRepetition(Type.Repetition.OPTIONAL) + .expectedColMetaData(null) + .assertMatches(); + } + + @Test + public void buildFieldIcebergFixed() { + ColumnMetadata testCol = + createColumnMetadataBuilder() + .logicalType("") + .sourceIcebergDataType("\"fixed[16]\"") + .nullable(true) + .build(); + + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + createParquetTypeInfoAssertionBuilder() + .typeInfo(typeInfo) + .expectedFieldName("TESTCOL") + .expectedTypeLength(16) + .expectedPrimitiveTypeName(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) + .expectedLogicalTypeAnnotation(null) + .expectedRepetition(Type.Repetition.OPTIONAL) + .expectedColMetaData(null) + .assertMatches(); + } + + @Test + public void buildFieldIcebergBinary() { + ColumnMetadata testCol = + createColumnMetadataBuilder() + .logicalType("") + .sourceIcebergDataType("\"binary\"") + .nullable(true) + .build(); + + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + createParquetTypeInfoAssertionBuilder() + .typeInfo(typeInfo) + .expectedFieldName("TESTCOL") + .expectedTypeLength(0) + .expectedPrimitiveTypeName(PrimitiveType.PrimitiveTypeName.BINARY) + .expectedLogicalTypeAnnotation(null) + .expectedRepetition(Type.Repetition.OPTIONAL) + .expectedColMetaData(null) + .assertMatches(); + } + /** Builder that helps to assert parquet type info */ private static class ParquetTypeInfoAssertionBuilder { private String fieldName; - private int fieldId; + private Integer fieldId; private PrimitiveType.PrimitiveTypeName primitiveTypeName; private LogicalTypeAnnotation logicalTypeAnnotation; private Type.Repetition repetition; - private int typeLength; + private Integer typeLength; private String colMetadata; - private ParquetTypeGenerator.ParquetTypeInfo typeInfo; + private ParquetTypeInfo typeInfo; + private Integer fieldCount; static ParquetTypeInfoAssertionBuilder newBuilder() { ParquetTypeInfoAssertionBuilder builder = new ParquetTypeInfoAssertionBuilder(); return builder; } - ParquetTypeInfoAssertionBuilder typeInfo(ParquetTypeGenerator.ParquetTypeInfo typeInfo) { + ParquetTypeInfoAssertionBuilder typeInfo(ParquetTypeInfo typeInfo) { this.typeInfo = typeInfo; return this; } @@ -450,17 +752,32 @@ ParquetTypeInfoAssertionBuilder expectedColMetaData(String colMetaData) { return this; } + ParquetTypeInfoAssertionBuilder expectedFieldCount(int fieldCount) { + this.fieldCount = fieldCount; + return this; + } + void assertMatches() { Type type = typeInfo.getParquetType(); Map metadata = typeInfo.getMetadata(); Assert.assertEquals(fieldName, type.getName()); - Assert.assertEquals(typeLength, type.asPrimitiveType().getTypeLength()); - Assert.assertEquals(fieldId, type.asPrimitiveType().getId().intValue()); - - Assert.assertEquals(primitiveTypeName, type.asPrimitiveType().getPrimitiveTypeName()); + if (typeLength != null) { + Assert.assertEquals(typeLength.intValue(), type.asPrimitiveType().getTypeLength()); + } + if (fieldId != null) { + Assert.assertEquals(fieldId.intValue(), type.asPrimitiveType().getId().intValue()); + } + if (primitiveTypeName != null) { + Assert.assertEquals(primitiveTypeName, type.asPrimitiveType().getPrimitiveTypeName()); + } Assert.assertEquals(logicalTypeAnnotation, type.getLogicalTypeAnnotation()); Assert.assertEquals(repetition, type.getRepetition()); - Assert.assertEquals(colMetadata, metadata.get(type.getId().toString())); + if (metadata != null) { + Assert.assertEquals(colMetadata, metadata.get(type.getId().toString())); + } + if (fieldCount != null) { + Assert.assertEquals(fieldCount.intValue(), type.asGroupType().getFieldCount()); + } } } @@ -469,6 +786,13 @@ private static ColumnMetadataBuilder createColumnMetadataBuilder() { } private static ParquetTypeInfoAssertionBuilder createParquetTypeInfoAssertionBuilder() { - return ParquetTypeInfoAssertionBuilder.newBuilder().expectedFieldId(COL_ORDINAL); + return createParquetTypeInfoAssertionBuilder(true); + } + + private static ParquetTypeInfoAssertionBuilder createParquetTypeInfoAssertionBuilder( + boolean expectedFieldId) { + return expectedFieldId + ? ParquetTypeInfoAssertionBuilder.newBuilder().expectedFieldId(COL_ORDINAL) + : ParquetTypeInfoAssertionBuilder.newBuilder(); } } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ParquetValueParserAssertionBuilder.java b/src/test/java/net/snowflake/ingest/streaming/internal/ParquetValueParserAssertionBuilder.java new file mode 100644 index 000000000..8480311fa --- /dev/null +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ParquetValueParserAssertionBuilder.java @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.charset.StandardCharsets; +import org.junit.Assert; + +/** Builder that helps to assert parsing of values to parquet types */ +class ParquetValueParserAssertionBuilder { + private ParquetBufferValue parquetBufferValue; + private RowBufferStats rowBufferStats; + private Class valueClass; + private Object value; + private float size; + private Object minMaxStat; + private long currentNullCount; + + static ParquetValueParserAssertionBuilder newBuilder() { + ParquetValueParserAssertionBuilder builder = new ParquetValueParserAssertionBuilder(); + return builder; + } + + ParquetValueParserAssertionBuilder parquetBufferValue(ParquetBufferValue parquetBufferValue) { + this.parquetBufferValue = parquetBufferValue; + return this; + } + + ParquetValueParserAssertionBuilder rowBufferStats(RowBufferStats rowBufferStats) { + this.rowBufferStats = rowBufferStats; + return this; + } + + ParquetValueParserAssertionBuilder expectedValueClass(Class valueClass) { + this.valueClass = valueClass; + return this; + } + + ParquetValueParserAssertionBuilder expectedParsedValue(Object value) { + this.value = value; + return this; + } + + ParquetValueParserAssertionBuilder expectedSize(float size) { + this.size = size; + return this; + } + + public ParquetValueParserAssertionBuilder expectedMinMax(Object minMaxStat) { + this.minMaxStat = minMaxStat; + return this; + } + + public ParquetValueParserAssertionBuilder expectedNullCount(long currentNullCount) { + this.currentNullCount = currentNullCount; + return this; + } + + void assertMatches() { + Assert.assertEquals(valueClass, parquetBufferValue.getValue().getClass()); + if (valueClass.equals(byte[].class)) { + Assert.assertArrayEquals((byte[]) value, (byte[]) parquetBufferValue.getValue()); + } else { + Assert.assertEquals(value, parquetBufferValue.getValue()); + } + Assert.assertEquals(size, parquetBufferValue.getSize(), 0); + if (minMaxStat instanceof BigInteger) { + Assert.assertEquals(minMaxStat, rowBufferStats.getCurrentMinIntValue()); + Assert.assertEquals(minMaxStat, rowBufferStats.getCurrentMaxIntValue()); + return; + } else if (minMaxStat instanceof byte[]) { + Assert.assertArrayEquals((byte[]) minMaxStat, rowBufferStats.getCurrentMinStrValue()); + Assert.assertArrayEquals((byte[]) minMaxStat, rowBufferStats.getCurrentMaxStrValue()); + return; + } else if (valueClass.equals(String.class)) { + // String can have null min/max stats for variant data types + Object min = + rowBufferStats.getCurrentMinStrValue() != null + ? new String(rowBufferStats.getCurrentMinStrValue(), StandardCharsets.UTF_8) + : rowBufferStats.getCurrentMinStrValue(); + Object max = + rowBufferStats.getCurrentMaxStrValue() != null + ? new String(rowBufferStats.getCurrentMaxStrValue(), StandardCharsets.UTF_8) + : rowBufferStats.getCurrentMaxStrValue(); + Assert.assertEquals(minMaxStat, min); + Assert.assertEquals(minMaxStat, max); + return; + } else if (minMaxStat instanceof Double || minMaxStat instanceof BigDecimal) { + Assert.assertEquals(minMaxStat, rowBufferStats.getCurrentMinRealValue()); + Assert.assertEquals(minMaxStat, rowBufferStats.getCurrentMaxRealValue()); + return; + } + throw new IllegalArgumentException( + String.format("Unknown data type for min stat: %s", minMaxStat.getClass())); + } + + void assertNull() { + Assert.assertNull(parquetBufferValue.getValue()); + Assert.assertEquals(currentNullCount, rowBufferStats.getCurrentNullCount()); + } +} diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java index a25bdf7c1..70b950fda 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java @@ -19,6 +19,7 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import net.snowflake.ingest.connection.RequestBuilder; import net.snowflake.ingest.streaming.InsertValidationResponse; import net.snowflake.ingest.streaming.OpenChannelRequest; @@ -26,12 +27,23 @@ import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.SFException; import org.apache.commons.codec.binary.Hex; +import org.apache.commons.lang3.StringUtils; import org.apache.parquet.hadoop.BdecParquetReader; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +@RunWith(Parameterized.class) public class RowBufferTest { + @Parameterized.Parameters(name = "isIcebergMode: {0}") + public static Object[] isIcebergMode() { + return new Object[] {false, true}; + } + + @Parameterized.Parameter public static boolean isIcebergMode; + private AbstractRowBuffer rowBufferOnErrorContinue; private AbstractRowBuffer rowBufferOnErrorAbort; private AbstractRowBuffer rowBufferOnErrorSkipBatch; @@ -105,6 +117,16 @@ static List createSchema() { colChar.setLength(11); colChar.setScale(0); + if (isIcebergMode) { + colTinyIntCase.setSourceIcebergDataType("\"decimal(2,0)\""); + colTinyInt.setSourceIcebergDataType("\"decimal(1,0)\""); + colSmallInt.setSourceIcebergDataType("\"decimal(2,0)\""); + colInt.setSourceIcebergDataType("\"int\""); + colBigInt.setSourceIcebergDataType("\"long\""); + colDecimal.setSourceIcebergDataType("\"decimal(38,2)\""); + colChar.setSourceIcebergDataType("\"string\""); + } + List columns = Arrays.asList( colTinyIntCase, colTinyInt, colSmallInt, colInt, colBigInt, colDecimal, colChar); @@ -127,7 +149,8 @@ private AbstractRowBuffer createTestBuffer(OpenChannelRequest.OnErrorOption o MAX_CHUNK_SIZE_IN_BYTES_DEFAULT, MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT, Constants.BdecParquetCompression.GZIP, - ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT), + ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT, + isIcebergMode), null, null); } @@ -273,9 +296,12 @@ public void testInvalidPhysicalType() { @Test public void testStringLength() { - testStringLengthHelper(this.rowBufferOnErrorContinue); - testStringLengthHelper(this.rowBufferOnErrorAbort); - testStringLengthHelper(this.rowBufferOnErrorSkipBatch); + /* Iceberg cannot specify max length of string */ + if (!isIcebergMode) { + testStringLengthHelper(this.rowBufferOnErrorContinue); + testStringLengthHelper(this.rowBufferOnErrorAbort); + testStringLengthHelper(this.rowBufferOnErrorSkipBatch); + } } @Test @@ -293,7 +319,7 @@ public void testRowIndexWithMultipleRowsWithErrorHelper(AbstractRowBuffer row rows.add(row); row = new HashMap<>(); - row.put("colChar", "1111111111111111111111"); // too big + row.put("colChar", StringUtils.repeat('1', 16777217)); // too big rows.add(row); row = new HashMap<>(); @@ -301,7 +327,7 @@ public void testRowIndexWithMultipleRowsWithErrorHelper(AbstractRowBuffer row rows.add(row); row = new HashMap<>(); - row.put("colChar", "1111111111111111111111"); // too big + row.put("colChar", StringUtils.repeat('1', 16777217)); // too big rows.add(row); InsertValidationResponse response = rowBuffer.insertRows(rows, null, null); @@ -333,8 +359,8 @@ public void testRowIndexWithMultipleRowsWithErrorHelper(AbstractRowBuffer row .equalsIgnoreCase( "The given row cannot be converted to the internal format due to invalid value:" + " Value cannot be ingested into Snowflake column COLCHAR of type STRING," - + " rowIndex:1, reason: String too long: length=22 characters maxLength=11" - + " characters")); + + " rowIndex:1, reason: String too long: length=16777217 bytes" + + " maxLength=16777216 bytes")); Assert.assertTrue( response .getInsertErrors() @@ -344,8 +370,8 @@ public void testRowIndexWithMultipleRowsWithErrorHelper(AbstractRowBuffer row .equalsIgnoreCase( "The given row cannot be converted to the internal format due to invalid value:" + " Value cannot be ingested into Snowflake column COLCHAR of type STRING," - + " rowIndex:3, reason: String too long: length=22 characters maxLength=11" - + " characters")); + + " rowIndex:3, reason: String too long: length=16777217 bytes" + + " maxLength=16777216 bytes")); } private void testStringLengthHelper(AbstractRowBuffer rowBuffer) { @@ -813,6 +839,12 @@ private void testStatsE2ETimestampHelper(OpenChannelRequest.OnErrorOption onErro colTimestampLtzSB16Scale6.setLogicalType("TIMESTAMP_LTZ"); colTimestampLtzSB16Scale6.setScale(6); + if (isIcebergMode) { + colTimestampLtzSB8.setSourceIcebergDataType("\"timestamptz\""); + colTimestampLtzSB16.setSourceIcebergDataType("\"timestamptz\""); + colTimestampLtzSB16Scale6.setSourceIcebergDataType("\"timestamptz\""); + } + innerBuffer.setupSchema( Arrays.asList(colTimestampLtzSB8, colTimestampLtzSB16, colTimestampLtzSB16Scale6)); @@ -838,18 +870,23 @@ private void testStatsE2ETimestampHelper(OpenChannelRequest.OnErrorOption onErro Assert.assertEquals(3, result.getRowCount()); Assert.assertEquals( - BigInteger.valueOf(1621899220), + BigInteger.valueOf(1621899220 * (isIcebergMode ? 1000000L : 1)), result.getColumnEps().get("COLTIMESTAMPLTZ_SB8").getCurrentMinIntValue()); Assert.assertEquals( - BigInteger.valueOf(1621899221), + BigInteger.valueOf(1621899221 * (isIcebergMode ? 1000000L : 1)), result.getColumnEps().get("COLTIMESTAMPLTZ_SB8").getCurrentMaxIntValue()); - Assert.assertEquals( - new BigInteger("1621899220123456789"), - result.getColumnEps().get("COLTIMESTAMPLTZ_SB16").getCurrentMinIntValue()); - Assert.assertEquals( - new BigInteger("1621899220223456789"), - result.getColumnEps().get("COLTIMESTAMPLTZ_SB16").getCurrentMaxIntValue()); + /* Iceberg only supports microsecond precision for TIMESTAMP_LTZ */ + if (!isIcebergMode) { + Assert.assertEquals( + new BigInteger("1621899220123456789"), + result.getColumnEps().get("COLTIMESTAMPLTZ_SB16").getCurrentMinIntValue()); + Assert.assertEquals( + new BigInteger("1621899220223456789"), + result.getColumnEps().get("COLTIMESTAMPLTZ_SB16").getCurrentMaxIntValue()); + Assert.assertEquals( + 1, result.getColumnEps().get("COLTIMESTAMPLTZ_SB16").getCurrentNullCount()); + } Assert.assertEquals( new BigInteger("1621899220123456"), @@ -859,7 +896,6 @@ private void testStatsE2ETimestampHelper(OpenChannelRequest.OnErrorOption onErro result.getColumnEps().get("COLTIMESTAMPLTZ_SB16_SCALE6").getCurrentMaxIntValue()); Assert.assertEquals(1, result.getColumnEps().get("COLTIMESTAMPLTZ_SB8").getCurrentNullCount()); - Assert.assertEquals(1, result.getColumnEps().get("COLTIMESTAMPLTZ_SB16").getCurrentNullCount()); Assert.assertEquals( 1, result.getColumnEps().get("COLTIMESTAMPLTZ_SB16_SCALE6").getCurrentNullCount()); } @@ -940,6 +976,11 @@ private void testE2ETimeHelper(OpenChannelRequest.OnErrorOption onErrorOption) { colTimeSB8.setLogicalType("TIME"); colTimeSB8.setScale(3); + if (isIcebergMode) { + colTimeSB4.setSourceIcebergDataType("\"time\""); + colTimeSB8.setSourceIcebergDataType("\"time\""); + } + innerBuffer.setupSchema(Arrays.asList(colTimeSB4, colTimeSB8)); Map row1 = new HashMap<>(); @@ -959,34 +1000,65 @@ private void testE2ETimeHelper(OpenChannelRequest.OnErrorOption onErrorOption) { Assert.assertFalse(response.hasErrors()); // Check data was inserted into the buffer correctly - Assert.assertEquals(10 * 60 * 60, innerBuffer.getVectorValueAt("COLTIMESB4", 0)); - Assert.assertEquals(11 * 60 * 60 + 15 * 60, innerBuffer.getVectorValueAt("COLTIMESB4", 1)); - Assert.assertNull(innerBuffer.getVectorValueAt("COLTIMESB4", 2)); + if (isIcebergMode) { + Assert.assertEquals(10 * 60 * 60 * 1000000L, innerBuffer.getVectorValueAt("COLTIMESB4", 0)); + Assert.assertEquals( + (11 * 60 * 60 + 15 * 60) * 1000000L, innerBuffer.getVectorValueAt("COLTIMESB4", 1)); + Assert.assertEquals( + (10 * 60 * 60 * 1000L + 123) * 1000L, innerBuffer.getVectorValueAt("COLTIMESB8", 0)); + Assert.assertEquals( + (11 * 60 * 60 * 1000L + 15 * 60 * 1000 + 456) * 1000L, + innerBuffer.getVectorValueAt("COLTIMESB8", 1)); + } else { + Assert.assertEquals(10 * 60 * 60, innerBuffer.getVectorValueAt("COLTIMESB4", 0)); + Assert.assertEquals(11 * 60 * 60 + 15 * 60, innerBuffer.getVectorValueAt("COLTIMESB4", 1)); + Assert.assertEquals( + 10 * 60 * 60 * 1000L + 123, innerBuffer.getVectorValueAt("COLTIMESB8", 0)); + Assert.assertEquals( + 11 * 60 * 60 * 1000L + 15 * 60 * 1000 + 456, + innerBuffer.getVectorValueAt("COLTIMESB8", 1)); + } - Assert.assertEquals(10 * 60 * 60 * 1000L + 123, innerBuffer.getVectorValueAt("COLTIMESB8", 0)); - Assert.assertEquals( - 11 * 60 * 60 * 1000L + 15 * 60 * 1000 + 456, innerBuffer.getVectorValueAt("COLTIMESB8", 1)); + Assert.assertNull(innerBuffer.getVectorValueAt("COLTIMESB4", 2)); Assert.assertNull(innerBuffer.getVectorValueAt("COLTIMESB8", 2)); // Check stats generation ChannelData result = innerBuffer.flush(); Assert.assertEquals(3, result.getRowCount()); - Assert.assertEquals( - BigInteger.valueOf(10 * 60 * 60), - result.getColumnEps().get("COLTIMESB4").getCurrentMinIntValue()); - Assert.assertEquals( - BigInteger.valueOf(11 * 60 * 60 + 15 * 60), - result.getColumnEps().get("COLTIMESB4").getCurrentMaxIntValue()); - Assert.assertEquals(1, result.getColumnEps().get("COLTIMESB4").getCurrentNullCount()); + if (isIcebergMode) { + Assert.assertEquals( + BigInteger.valueOf(10 * 60 * 60 * 1000000L), + result.getColumnEps().get("COLTIMESB4").getCurrentMinIntValue()); + Assert.assertEquals( + BigInteger.valueOf((11 * 60 * 60 + 15 * 60) * 1000000L), + result.getColumnEps().get("COLTIMESB4").getCurrentMaxIntValue()); + Assert.assertEquals(1, result.getColumnEps().get("COLTIMESB4").getCurrentNullCount()); - Assert.assertEquals( - BigInteger.valueOf(10 * 60 * 60 * 1000L + 123), - result.getColumnEps().get("COLTIMESB8").getCurrentMinIntValue()); - Assert.assertEquals( - BigInteger.valueOf(11 * 60 * 60 * 1000L + 15 * 60 * 1000 + 456), - result.getColumnEps().get("COLTIMESB8").getCurrentMaxIntValue()); - Assert.assertEquals(1, result.getColumnEps().get("COLTIMESB8").getCurrentNullCount()); + Assert.assertEquals( + BigInteger.valueOf((10 * 60 * 60 * 1000L + 123) * 1000L), + result.getColumnEps().get("COLTIMESB8").getCurrentMinIntValue()); + Assert.assertEquals( + BigInteger.valueOf((11 * 60 * 60 * 1000L + 15 * 60 * 1000 + 456) * 1000L), + result.getColumnEps().get("COLTIMESB8").getCurrentMaxIntValue()); + Assert.assertEquals(1, result.getColumnEps().get("COLTIMESB8").getCurrentNullCount()); + } else { + Assert.assertEquals( + BigInteger.valueOf(10 * 60 * 60), + result.getColumnEps().get("COLTIMESB4").getCurrentMinIntValue()); + Assert.assertEquals( + BigInteger.valueOf(11 * 60 * 60 + 15 * 60), + result.getColumnEps().get("COLTIMESB4").getCurrentMaxIntValue()); + Assert.assertEquals(1, result.getColumnEps().get("COLTIMESB4").getCurrentNullCount()); + + Assert.assertEquals( + BigInteger.valueOf(10 * 60 * 60 * 1000L + 123), + result.getColumnEps().get("COLTIMESB8").getCurrentMinIntValue()); + Assert.assertEquals( + BigInteger.valueOf(11 * 60 * 60 * 1000L + 15 * 60 * 1000 + 456), + result.getColumnEps().get("COLTIMESB8").getCurrentMaxIntValue()); + Assert.assertEquals(1, result.getColumnEps().get("COLTIMESB8").getCurrentNullCount()); + } } @Test @@ -1005,6 +1077,7 @@ private void testMaxInsertRowsBatchSizeHelper(OpenChannelRequest.OnErrorOption o colBinary.setLogicalType("BINARY"); colBinary.setLength(8 * 1024 * 1024); colBinary.setByteLength(8 * 1024 * 1024); + colBinary.setSourceIcebergDataType("\"binary\""); byte[] arr = new byte[8 * 1024 * 1024]; innerBuffer.setupSchema(Collections.singletonList(colBinary)); @@ -1289,6 +1362,9 @@ private void testE2EBinaryHelper(OpenChannelRequest.OnErrorOption onErrorOption) colBinary.setLength(32); colBinary.setByteLength(256); colBinary.setScale(0); + if (isIcebergMode) { + colBinary.setSourceIcebergDataType("\"binary\""); + } innerBuffer.setupSchema(Collections.singletonList(colBinary)); @@ -1531,9 +1607,11 @@ public void testOnErrorAbortSkipBatch() { @Test public void testE2EVariant() { - testE2EVariantHelper(OpenChannelRequest.OnErrorOption.ABORT); - testE2EVariantHelper(OpenChannelRequest.OnErrorOption.CONTINUE); - testE2EVariantHelper(OpenChannelRequest.OnErrorOption.SKIP_BATCH); + if (!isIcebergMode) { + testE2EVariantHelper(OpenChannelRequest.OnErrorOption.ABORT); + testE2EVariantHelper(OpenChannelRequest.OnErrorOption.CONTINUE); + testE2EVariantHelper(OpenChannelRequest.OnErrorOption.SKIP_BATCH); + } } private void testE2EVariantHelper(OpenChannelRequest.OnErrorOption onErrorOption) { @@ -1582,9 +1660,11 @@ private void testE2EVariantHelper(OpenChannelRequest.OnErrorOption onErrorOption @Test public void testE2EObject() { - testE2EObjectHelper(OpenChannelRequest.OnErrorOption.ABORT); - testE2EObjectHelper(OpenChannelRequest.OnErrorOption.CONTINUE); - testE2EObjectHelper(OpenChannelRequest.OnErrorOption.SKIP_BATCH); + if (!isIcebergMode) { + testE2EObjectHelper(OpenChannelRequest.OnErrorOption.ABORT); + testE2EObjectHelper(OpenChannelRequest.OnErrorOption.CONTINUE); + testE2EObjectHelper(OpenChannelRequest.OnErrorOption.SKIP_BATCH); + } } private void testE2EObjectHelper(OpenChannelRequest.OnErrorOption onErrorOption) { @@ -1615,9 +1695,11 @@ private void testE2EObjectHelper(OpenChannelRequest.OnErrorOption onErrorOption) @Test public void testE2EArray() { - testE2EArrayHelper(OpenChannelRequest.OnErrorOption.ABORT); - testE2EArrayHelper(OpenChannelRequest.OnErrorOption.CONTINUE); - testE2EArrayHelper(OpenChannelRequest.OnErrorOption.SKIP_BATCH); + if (!isIcebergMode) { + testE2EArrayHelper(OpenChannelRequest.OnErrorOption.ABORT); + testE2EArrayHelper(OpenChannelRequest.OnErrorOption.CONTINUE); + testE2EArrayHelper(OpenChannelRequest.OnErrorOption.SKIP_BATCH); + } } private void testE2EArrayHelper(OpenChannelRequest.OnErrorOption onErrorOption) { @@ -1697,7 +1779,8 @@ public void testOnErrorAbortRowsWithError() { // insert one valid and one invalid row List> mixedRows = new ArrayList<>(); mixedRows.add(Collections.singletonMap("colChar", "b")); - mixedRows.add(Collections.singletonMap("colChar", "1111111111111111111111")); // too big + mixedRows.add( + Collections.singletonMap("colChar", StringUtils.repeat('1', 16777217))); // too big response = innerBufferOnErrorContinue.insertRows(mixedRows, "1", "3"); Assert.assertTrue(response.hasErrors()); @@ -1707,6 +1790,23 @@ public void testOnErrorAbortRowsWithError() { List> snapshotContinueParquet = ((ParquetChunkData) innerBufferOnErrorContinue.getSnapshot().get()).rows; + if (isIcebergMode) { + // Convert every object to string for iceberg mode + snapshotContinueParquet = + snapshotContinueParquet.stream() + .map( + row -> + row.stream() + .map( + obj -> { + if (obj instanceof byte[]) { + return new String((byte[]) obj, StandardCharsets.UTF_8); + } + return obj; + }) + .collect(Collectors.toList())) + .collect(Collectors.toList()); + } // validRows and only the good row from mixedRows are in the buffer Assert.assertEquals(2, snapshotContinueParquet.size()); Assert.assertEquals(Arrays.asList("a"), snapshotContinueParquet.get(0)); @@ -1714,6 +1814,23 @@ public void testOnErrorAbortRowsWithError() { List> snapshotAbortParquet = ((ParquetChunkData) innerBufferOnErrorAbort.getSnapshot().get()).rows; + if (isIcebergMode) { + // Convert every object to string for iceberg mode + snapshotAbortParquet = + snapshotAbortParquet.stream() + .map( + row -> + row.stream() + .map( + obj -> { + if (obj instanceof byte[]) { + return new String((byte[]) obj, StandardCharsets.UTF_8); + } + return obj; + }) + .collect(Collectors.toList())) + .collect(Collectors.toList()); + } // only validRows and none of the mixedRows are in the buffer Assert.assertEquals(1, snapshotAbortParquet.size()); Assert.assertEquals(Arrays.asList("a"), snapshotAbortParquet.get(0)); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ParquetValueParserTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeParquetValueParserTest.java similarity index 76% rename from src/test/java/net/snowflake/ingest/streaming/internal/ParquetValueParserTest.java rename to src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeParquetValueParserTest.java index e7c74e3b7..64db57c31 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ParquetValueParserTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeParquetValueParserTest.java @@ -1,9 +1,9 @@ package net.snowflake.ingest.streaming.internal; import static java.time.ZoneOffset.UTC; -import static net.snowflake.ingest.streaming.internal.ParquetValueParser.BIT_ENCODING_BYTE_LEN; -import static net.snowflake.ingest.streaming.internal.ParquetValueParser.BYTE_ARRAY_LENGTH_ENCODING_BYTE_LEN; -import static net.snowflake.ingest.streaming.internal.ParquetValueParser.DEFINITION_LEVEL_ENCODING_BYTE_LEN; +import static net.snowflake.ingest.streaming.internal.ParquetBufferValue.BIT_ENCODING_BYTE_LEN; +import static net.snowflake.ingest.streaming.internal.ParquetBufferValue.BYTE_ARRAY_LENGTH_ENCODING_BYTE_LEN; +import static net.snowflake.ingest.streaming.internal.ParquetBufferValue.DEFINITION_LEVEL_ENCODING_BYTE_LEN; import java.math.BigDecimal; import java.math.BigInteger; @@ -16,7 +16,7 @@ import org.junit.Assert; import org.junit.Test; -public class ParquetValueParserTest { +public class SnowflakeParquetValueParserTest { @Test public void parseValueFixedSB1ToInt32() { @@ -30,8 +30,8 @@ public void parseValueFixedSB1ToInt32() { .build(); RowBufferStats rowBufferStats = new RowBufferStats("COL1"); - ParquetValueParser.ParquetBufferValue pv = - ParquetValueParser.parseColumnValueToParquet( + ParquetBufferValue pv = + SnowflakeParquetValueParser.parseColumnValueToParquet( 12, testCol, PrimitiveType.PrimitiveTypeName.INT32, @@ -62,8 +62,8 @@ public void parseValueFixedSB2ToInt32() { .build(); RowBufferStats rowBufferStats = new RowBufferStats("COL1"); - ParquetValueParser.ParquetBufferValue pv = - ParquetValueParser.parseColumnValueToParquet( + ParquetBufferValue pv = + SnowflakeParquetValueParser.parseColumnValueToParquet( 1234, testCol, PrimitiveType.PrimitiveTypeName.INT32, @@ -94,8 +94,8 @@ public void parseValueFixedSB4ToInt32() { .build(); RowBufferStats rowBufferStats = new RowBufferStats("COL1"); - ParquetValueParser.ParquetBufferValue pv = - ParquetValueParser.parseColumnValueToParquet( + ParquetBufferValue pv = + SnowflakeParquetValueParser.parseColumnValueToParquet( 123456789, testCol, PrimitiveType.PrimitiveTypeName.INT32, @@ -126,8 +126,8 @@ public void parseValueFixedSB8ToInt64() { .build(); RowBufferStats rowBufferStats = new RowBufferStats("COL1"); - ParquetValueParser.ParquetBufferValue pv = - ParquetValueParser.parseColumnValueToParquet( + ParquetBufferValue pv = + SnowflakeParquetValueParser.parseColumnValueToParquet( 123456789987654321L, testCol, PrimitiveType.PrimitiveTypeName.INT64, @@ -158,8 +158,8 @@ public void parseValueFixedSB16ToByteArray() { .build(); RowBufferStats rowBufferStats = new RowBufferStats("COL1"); - ParquetValueParser.ParquetBufferValue pv = - ParquetValueParser.parseColumnValueToParquet( + ParquetBufferValue pv = + SnowflakeParquetValueParser.parseColumnValueToParquet( new BigDecimal("91234567899876543219876543211234567891"), testCol, PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, @@ -173,7 +173,7 @@ public void parseValueFixedSB16ToByteArray() { .rowBufferStats(rowBufferStats) .expectedValueClass(byte[].class) .expectedParsedValue( - ParquetValueParser.getSb16Bytes( + SnowflakeParquetValueParser.getSb16Bytes( new BigInteger("91234567899876543219876543211234567891"))) .expectedSize(16.0f + DEFINITION_LEVEL_ENCODING_BYTE_LEN) .expectedMinMax(new BigInteger("91234567899876543219876543211234567891")) @@ -192,8 +192,8 @@ public void parseValueFixedDecimalToInt32() { .build(); RowBufferStats rowBufferStats = new RowBufferStats("COL1"); - ParquetValueParser.ParquetBufferValue pv = - ParquetValueParser.parseColumnValueToParquet( + ParquetBufferValue pv = + SnowflakeParquetValueParser.parseColumnValueToParquet( new BigDecimal("12345.54321"), testCol, PrimitiveType.PrimitiveTypeName.DOUBLE, @@ -222,8 +222,8 @@ public void parseValueDouble() { .build(); RowBufferStats rowBufferStats = new RowBufferStats("COL1"); - ParquetValueParser.ParquetBufferValue pv = - ParquetValueParser.parseColumnValueToParquet( + ParquetBufferValue pv = + SnowflakeParquetValueParser.parseColumnValueToParquet( 12345.54321d, testCol, PrimitiveType.PrimitiveTypeName.DOUBLE, @@ -252,8 +252,8 @@ public void parseValueBoolean() { .build(); RowBufferStats rowBufferStats = new RowBufferStats("COL1"); - ParquetValueParser.ParquetBufferValue pv = - ParquetValueParser.parseColumnValueToParquet( + ParquetBufferValue pv = + SnowflakeParquetValueParser.parseColumnValueToParquet( true, testCol, PrimitiveType.PrimitiveTypeName.BOOLEAN, @@ -282,8 +282,8 @@ public void parseValueBinary() { .build(); RowBufferStats rowBufferStats = new RowBufferStats("COL1"); - ParquetValueParser.ParquetBufferValue pv = - ParquetValueParser.parseColumnValueToParquet( + ParquetBufferValue pv = + SnowflakeParquetValueParser.parseColumnValueToParquet( "1234abcd".getBytes(), testCol, PrimitiveType.PrimitiveTypeName.BINARY, @@ -327,8 +327,8 @@ private void testJsonWithLogicalType(String logicalType, boolean enableNewJsonPa "{\"key1\":-879869596,\"key2\":\"value2\",\"key3\":null," + "\"key4\":{\"key41\":0.032437,\"key42\":\"value42\",\"key43\":null}}"; RowBufferStats rowBufferStats = new RowBufferStats("COL1"); - ParquetValueParser.ParquetBufferValue pv = - ParquetValueParser.parseColumnValueToParquet( + ParquetBufferValue pv = + SnowflakeParquetValueParser.parseColumnValueToParquet( var, testCol, PrimitiveType.PrimitiveTypeName.BINARY, @@ -377,8 +377,8 @@ private void testNullJsonWithLogicalType(String var, boolean enableNewJsonParsin .build(); RowBufferStats rowBufferStats = new RowBufferStats("COL1"); - ParquetValueParser.ParquetBufferValue pv = - ParquetValueParser.parseColumnValueToParquet( + ParquetBufferValue pv = + SnowflakeParquetValueParser.parseColumnValueToParquet( var, testCol, PrimitiveType.PrimitiveTypeName.BINARY, @@ -418,8 +418,8 @@ public void parseValueArrayToBinaryInternal(boolean enableNewJsonParsingLogic) { input.put("c", "3"); RowBufferStats rowBufferStats = new RowBufferStats("COL1"); - ParquetValueParser.ParquetBufferValue pv = - ParquetValueParser.parseColumnValueToParquet( + ParquetBufferValue pv = + SnowflakeParquetValueParser.parseColumnValueToParquet( input, testCol, PrimitiveType.PrimitiveTypeName.BINARY, @@ -456,8 +456,8 @@ public void parseValueTextToBinary() { String text = "This is a sample text! Length is bigger than 32 bytes :)"; RowBufferStats rowBufferStats = new RowBufferStats("COL1"); - ParquetValueParser.ParquetBufferValue pv = - ParquetValueParser.parseColumnValueToParquet( + ParquetBufferValue pv = + SnowflakeParquetValueParser.parseColumnValueToParquet( text, testCol, PrimitiveType.PrimitiveTypeName.BINARY, @@ -497,7 +497,7 @@ public void parseValueTimestampNtzSB4Error() { Assert.assertThrows( SFException.class, () -> - ParquetValueParser.parseColumnValueToParquet( + SnowflakeParquetValueParser.parseColumnValueToParquet( "2013-04-28 20:57:00", testCol, PrimitiveType.PrimitiveTypeName.INT32, @@ -521,8 +521,8 @@ public void parseValueTimestampNtzSB8ToINT64() { .build(); RowBufferStats rowBufferStats = new RowBufferStats("COL1"); - ParquetValueParser.ParquetBufferValue pv = - ParquetValueParser.parseColumnValueToParquet( + ParquetBufferValue pv = + SnowflakeParquetValueParser.parseColumnValueToParquet( "2013-04-28T20:57:01.000", testCol, PrimitiveType.PrimitiveTypeName.INT64, @@ -552,8 +552,8 @@ public void parseValueTimestampNtzSB16ToByteArray() { .build(); RowBufferStats rowBufferStats = new RowBufferStats("COL1"); - ParquetValueParser.ParquetBufferValue pv = - ParquetValueParser.parseColumnValueToParquet( + ParquetBufferValue pv = + SnowflakeParquetValueParser.parseColumnValueToParquet( "2022-09-18T22:05:07.123456789", testCol, PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, @@ -567,7 +567,7 @@ public void parseValueTimestampNtzSB16ToByteArray() { .rowBufferStats(rowBufferStats) .expectedValueClass(byte[].class) .expectedParsedValue( - ParquetValueParser.getSb16Bytes(BigInteger.valueOf(1663538707123456789L))) + SnowflakeParquetValueParser.getSb16Bytes(BigInteger.valueOf(1663538707123456789L))) .expectedSize(16.0f + DEFINITION_LEVEL_ENCODING_BYTE_LEN) .expectedMinMax(BigInteger.valueOf(1663538707123456789L)) .assertMatches(); @@ -584,8 +584,8 @@ public void parseValueDateToInt32() { .build(); RowBufferStats rowBufferStats = new RowBufferStats("COL1"); - ParquetValueParser.ParquetBufferValue pv = - ParquetValueParser.parseColumnValueToParquet( + ParquetBufferValue pv = + SnowflakeParquetValueParser.parseColumnValueToParquet( "2021-01-01", testCol, PrimitiveType.PrimitiveTypeName.INT32, @@ -615,8 +615,8 @@ public void parseValueTimeSB4ToInt32() { .build(); RowBufferStats rowBufferStats = new RowBufferStats("COL1"); - ParquetValueParser.ParquetBufferValue pv = - ParquetValueParser.parseColumnValueToParquet( + ParquetBufferValue pv = + SnowflakeParquetValueParser.parseColumnValueToParquet( "01:00:00", testCol, PrimitiveType.PrimitiveTypeName.INT32, @@ -646,8 +646,8 @@ public void parseValueTimeSB8ToInt64() { .build(); RowBufferStats rowBufferStats = new RowBufferStats("COL1"); - ParquetValueParser.ParquetBufferValue pv = - ParquetValueParser.parseColumnValueToParquet( + ParquetBufferValue pv = + SnowflakeParquetValueParser.parseColumnValueToParquet( "01:00:00.123", testCol, PrimitiveType.PrimitiveTypeName.INT64, @@ -681,7 +681,7 @@ public void parseValueTimeSB16Error() { Assert.assertThrows( SFException.class, () -> - ParquetValueParser.parseColumnValueToParquet( + SnowflakeParquetValueParser.parseColumnValueToParquet( "11:00:00.12345678", testCol, PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, @@ -692,99 +692,4 @@ public void parseValueTimeSB16Error() { Assert.assertEquals( "Unknown data type for logical: TIME, physical: SB16.", exception.getMessage()); } - - /** Builder that helps to assert parsing of values to parquet types */ - private static class ParquetValueParserAssertionBuilder { - private ParquetValueParser.ParquetBufferValue parquetBufferValue; - private RowBufferStats rowBufferStats; - private Class valueClass; - private Object value; - private float size; - private Object minMaxStat; - private long currentNullCount; - - static ParquetValueParserAssertionBuilder newBuilder() { - ParquetValueParserAssertionBuilder builder = new ParquetValueParserAssertionBuilder(); - return builder; - } - - ParquetValueParserAssertionBuilder parquetBufferValue( - ParquetValueParser.ParquetBufferValue parquetBufferValue) { - this.parquetBufferValue = parquetBufferValue; - return this; - } - - ParquetValueParserAssertionBuilder rowBufferStats(RowBufferStats rowBufferStats) { - this.rowBufferStats = rowBufferStats; - return this; - } - - ParquetValueParserAssertionBuilder expectedValueClass(Class valueClass) { - this.valueClass = valueClass; - return this; - } - - ParquetValueParserAssertionBuilder expectedParsedValue(Object value) { - this.value = value; - return this; - } - - ParquetValueParserAssertionBuilder expectedSize(float size) { - this.size = size; - return this; - } - - public ParquetValueParserAssertionBuilder expectedMinMax(Object minMaxStat) { - this.minMaxStat = minMaxStat; - return this; - } - - public ParquetValueParserAssertionBuilder expectedNullCount(long currentNullCount) { - this.currentNullCount = currentNullCount; - return this; - } - - void assertMatches() { - Assert.assertEquals(valueClass, parquetBufferValue.getValue().getClass()); - if (valueClass.equals(byte[].class)) { - Assert.assertArrayEquals((byte[]) value, (byte[]) parquetBufferValue.getValue()); - } else { - Assert.assertEquals(value, parquetBufferValue.getValue()); - } - Assert.assertEquals(size, parquetBufferValue.getSize(), 0); - if (minMaxStat instanceof BigInteger) { - Assert.assertEquals(minMaxStat, rowBufferStats.getCurrentMinIntValue()); - Assert.assertEquals(minMaxStat, rowBufferStats.getCurrentMaxIntValue()); - return; - } else if (minMaxStat instanceof byte[]) { - Assert.assertArrayEquals((byte[]) minMaxStat, rowBufferStats.getCurrentMinStrValue()); - Assert.assertArrayEquals((byte[]) minMaxStat, rowBufferStats.getCurrentMaxStrValue()); - return; - } else if (valueClass.equals(String.class)) { - // String can have null min/max stats for variant data types - Object min = - rowBufferStats.getCurrentMinStrValue() != null - ? new String(rowBufferStats.getCurrentMinStrValue(), StandardCharsets.UTF_8) - : rowBufferStats.getCurrentMinStrValue(); - Object max = - rowBufferStats.getCurrentMaxStrValue() != null - ? new String(rowBufferStats.getCurrentMaxStrValue(), StandardCharsets.UTF_8) - : rowBufferStats.getCurrentMaxStrValue(); - Assert.assertEquals(minMaxStat, min); - Assert.assertEquals(minMaxStat, max); - return; - } else if (minMaxStat instanceof Double || minMaxStat instanceof BigDecimal) { - Assert.assertEquals(minMaxStat, rowBufferStats.getCurrentMinRealValue()); - Assert.assertEquals(minMaxStat, rowBufferStats.getCurrentMaxRealValue()); - return; - } - throw new IllegalArgumentException( - String.format("Unknown data type for min stat: %s", minMaxStat.getClass())); - } - - void assertNull() { - Assert.assertNull(parquetBufferValue.getValue()); - Assert.assertEquals(currentNullCount, rowBufferStats.getCurrentNullCount()); - } - } } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeServiceClientTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeServiceClientTest.java index 0e113c0ab..4815a5469 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeServiceClientTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeServiceClientTest.java @@ -186,6 +186,7 @@ public void testOpenChannel() throws IngestResponseException, IOException { "test_table", "test_channel", Constants.WriteMode.CLOUD_STORAGE, + false, "test_offset_token"); OpenChannelResponse openChannelResponse = snowflakeServiceClient.openChannel(openChannelRequest); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java index 9495b133e..d576c64fd 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java @@ -632,6 +632,9 @@ public void testInsertTooLargeRow() { col.setLogicalType("BINARY"); col.setLength(8388608); col.setByteLength(8388608); + if (isIcebergMode) { + col.setSourceIcebergDataType("\"binary\""); + } return col; }) .collect(Collectors.toList());