From 7eb7ba524a430e2173e3d92e9b0706bda231673c Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Thu, 19 Sep 2024 13:28:31 -0700 Subject: [PATCH] SNOW-1507007 Support schema for new table format (#814) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We use FDN-specific logical and physical data types only today. In this PR we change to use iceberg’s data types so there is no loss of signal between the table schema on the server versus what data type conversions are done in the client. --- pom.xml | 59 ++- scripts/process_licenses.py | 5 +- .../internal/ClientBufferParameters.java | 25 +- .../streaming/internal/ColumnMetadata.java | 19 +- .../internal/DataValidationUtil.java | 81 +++- .../internal/IcebergParquetValueParser.java | 301 +++++++++++++ .../internal/OpenChannelRequestInternal.java | 14 +- .../internal/ParquetBufferValue.java | 51 +++ .../streaming/internal/ParquetColumn.java | 20 + .../streaming/internal/ParquetRowBuffer.java | 47 +- .../internal/ParquetTypeGenerator.java | 194 ++++----- .../streaming/internal/ParquetTypeInfo.java | 30 ++ .../internal/RegisterBlobRequest.java | 12 +- ....java => SnowflakeParquetValueParser.java} | 64 +-- ...nowflakeStreamingIngestClientInternal.java | 93 ++-- .../net/snowflake/ingest/utils/Constants.java | 2 + .../ingest/utils/IcebergDataTypeParser.java | 198 +++++++++ .../ingest/utils/ParameterProvider.java | 2 + .../parquet/hadoop/BdecParquetWriter.java | 83 ++-- .../streaming/internal/BlobBuilderTest.java | 2 +- .../internal/ColumnMetadataBuilder.java | 15 +- .../internal/DataValidationUtilTest.java | 91 ++++ .../internal/IcebergDataTypeParserTest.java | 107 +++++ .../IcebergParquetValueParserTest.java | 325 ++++++++++++++ .../internal/ParquetTypeGeneratorTest.java | 404 ++++++++++++++++-- .../ParquetValueParserAssertionBuilder.java | 104 +++++ .../streaming/internal/RowBufferTest.java | 215 +++++++--- ...a => SnowflakeParquetValueParserTest.java} | 183 ++------ .../internal/SnowflakeServiceClientTest.java | 1 + .../SnowflakeStreamingIngestChannelTest.java | 3 + 30 files changed, 2240 insertions(+), 510 deletions(-) create mode 100644 src/main/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParser.java create mode 100644 src/main/java/net/snowflake/ingest/streaming/internal/ParquetBufferValue.java create mode 100644 src/main/java/net/snowflake/ingest/streaming/internal/ParquetColumn.java create mode 100644 src/main/java/net/snowflake/ingest/streaming/internal/ParquetTypeInfo.java rename src/main/java/net/snowflake/ingest/streaming/internal/{ParquetValueParser.java => SnowflakeParquetValueParser.java} (86%) create mode 100644 src/main/java/net/snowflake/ingest/utils/IcebergDataTypeParser.java create mode 100644 src/test/java/net/snowflake/ingest/streaming/internal/IcebergDataTypeParserTest.java create mode 100644 src/test/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParserTest.java create mode 100644 src/test/java/net/snowflake/ingest/streaming/internal/ParquetValueParserAssertionBuilder.java rename src/test/java/net/snowflake/ingest/streaming/internal/{ParquetValueParserTest.java => SnowflakeParquetValueParserTest.java} (76%) diff --git a/pom.xml b/pom.xml index 243bbce34..73483f5ed 100644 --- a/pom.xml +++ b/pom.xml @@ -1,4 +1,7 @@ + 4.0.0 @@ -48,6 +51,7 @@ 2.17.2 32.0.1-jre 3.3.6 + 1.3.1 true 0.8.5 ${project.build.directory}/dependency-jars @@ -262,6 +266,21 @@ + + org.apache.iceberg + iceberg-api + ${iceberg.version} + + + org.apache.iceberg + iceberg-core + ${iceberg.version} + + + org.apache.iceberg + iceberg-parquet + ${iceberg.version} + org.apache.parquet parquet-column @@ -506,6 +525,34 @@ org.apache.hadoop hadoop-mapreduce-client-core + + org.apache.iceberg + iceberg-api + + + org.apache.iceberg + iceberg-core + + + io.airlift + aircompressor + + + + + org.apache.iceberg + iceberg-parquet + + + io.airlift + aircompressor + + + org.apache.parquet + parquet-avro + + + org.apache.parquet @@ -901,9 +948,10 @@ |Apache-2.0 |Apache License, Version 2.0 |Apache 2.0 - |Apache License V2.0 + |Apache License V2.0 + |Apache 2 BSD 2-Clause License - |The BSD License + |The BSD License |BSD The MIT License|MIT License 3-Clause BSD License|BSD-3-Clause @@ -1146,6 +1194,10 @@ io.airlift.compress ${shadeBase}.io.airlift.compress + + org.roaringbitmap + ${shadeBase}.org.roaringbitmap + @@ -1164,6 +1216,9 @@ META-INF/*.SF META-INF/*.DSA META-INF/*.RSA + LICENSE + NOTICE + iceberg-build.properties google/protobuf/**/*.proto diff --git a/scripts/process_licenses.py b/scripts/process_licenses.py index 4a0377a8e..9f715abd6 100644 --- a/scripts/process_licenses.py +++ b/scripts/process_licenses.py @@ -61,6 +61,9 @@ "org.bouncycastle:bcpkix-jdk18on": BOUNCY_CASTLE_LICENSE, "org.bouncycastle:bcutil-jdk18on": BOUNCY_CASTLE_LICENSE, "org.bouncycastle:bcprov-jdk18on": BOUNCY_CASTLE_LICENSE, + "com.thoughtworks.paranamer:paranamer": BSD_2_CLAUSE_LICENSE, + "org.roaringbitmap:RoaringBitmap": APACHE_LICENSE, + "org.roaringbitmap:shims": APACHE_LICENSE, } @@ -115,7 +118,7 @@ def main(): for zip_info in current_jar_as_zip.infolist(): if zip_info.is_dir(): continue - if zip_info.filename in ("META-INF/LICENSE.txt", "META-INF/LICENSE", "META-INF/LICENSE.md"): + if zip_info.filename in ("META-INF/LICENSE.txt", "META-INF/LICENSE", "META-INF/LICENSE.md", "LICENSE"): license_found = True dependency_with_license_count += 1 # Extract license to the target directory diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java b/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java index c73337d1e..7e482679a 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2023-2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.streaming.internal; @@ -18,21 +18,26 @@ public class ClientBufferParameters { private Constants.BdecParquetCompression bdecParquetCompression; + private boolean isIcebergMode; + /** * Private constructor used for test methods * * @param maxChunkSizeInBytes maximum chunk size in bytes * @param maxAllowedRowSizeInBytes maximum row size in bytes + * @param isIcebergMode */ private ClientBufferParameters( long maxChunkSizeInBytes, long maxAllowedRowSizeInBytes, Constants.BdecParquetCompression bdecParquetCompression, - boolean enableNewJsonParsingLogic) { + boolean enableNewJsonParsingLogic, + boolean isIcebergMode) { this.maxChunkSizeInBytes = maxChunkSizeInBytes; this.maxAllowedRowSizeInBytes = maxAllowedRowSizeInBytes; this.bdecParquetCompression = bdecParquetCompression; this.enableNewJsonParsingLogic = enableNewJsonParsingLogic; + this.isIcebergMode = isIcebergMode; } /** @param clientInternal reference to the client object where the relevant parameters are set */ @@ -49,28 +54,34 @@ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInter clientInternal != null ? clientInternal.getParameterProvider().getBdecParquetCompressionAlgorithm() : ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT; - this.enableNewJsonParsingLogic = clientInternal != null ? clientInternal.getParameterProvider().isEnableNewJsonParsingLogic() : ParameterProvider.ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT; + this.isIcebergMode = + clientInternal != null + ? clientInternal.isIcebergMode() + : ParameterProvider.IS_ICEBERG_MODE_DEFAULT; } /** * @param maxChunkSizeInBytes maximum chunk size in bytes * @param maxAllowedRowSizeInBytes maximum row size in bytes + * @param isIcebergMode * @return ClientBufferParameters object */ public static ClientBufferParameters test_createClientBufferParameters( long maxChunkSizeInBytes, long maxAllowedRowSizeInBytes, Constants.BdecParquetCompression bdecParquetCompression, - boolean enableNewJsonParsingLogic) { + boolean enableNewJsonParsingLogic, + boolean isIcebergMode) { return new ClientBufferParameters( maxChunkSizeInBytes, maxAllowedRowSizeInBytes, bdecParquetCompression, - enableNewJsonParsingLogic); + enableNewJsonParsingLogic, + isIcebergMode); } public long getMaxChunkSizeInBytes() { @@ -88,4 +99,8 @@ public Constants.BdecParquetCompression getBdecParquetCompression() { public boolean isEnableNewJsonParsingLogic() { return enableNewJsonParsingLogic; } + + public boolean getIsIcebergMode() { + return isIcebergMode; + } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ColumnMetadata.java b/src/main/java/net/snowflake/ingest/streaming/internal/ColumnMetadata.java index 73b75da9e..1231247b5 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ColumnMetadata.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ColumnMetadata.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.streaming.internal; @@ -22,6 +22,13 @@ class ColumnMetadata { private boolean nullable; private String collation; + /** + * The Json serialization of Iceberg data type of the column, see JSON serialization + * for more details. + */ + private String sourceIcebergDataType; + /** * The column ordinal is an internal id of the column used by server scanner for the column * identification. @@ -128,6 +135,15 @@ public Integer getOrdinal() { return ordinal; } + @JsonProperty("source_iceberg_data_type") + void setSourceIcebergDataType(String sourceIcebergDataType) { + this.sourceIcebergDataType = sourceIcebergDataType; + } + + public String getSourceIcebergDataType() { + return sourceIcebergDataType; + } + String getInternalName() { return internalName; } @@ -144,6 +160,7 @@ public String toString() { map.put("byte_length", this.byteLength); map.put("length", this.length); map.put("nullable", this.nullable); + map.put("source_iceberg_datatype", this.sourceIcebergDataType); return map.toString(); } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java b/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java index 310a711d0..6e3281997 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.streaming.internal; @@ -19,6 +19,7 @@ import java.io.IOException; import java.math.BigDecimal; import java.math.BigInteger; +import java.math.RoundingMode; import java.nio.charset.StandardCharsets; import java.time.Instant; import java.time.LocalDate; @@ -718,7 +719,13 @@ static BigDecimal validateAndParseBigDecimal( || input instanceof Long) { return BigDecimal.valueOf(((Number) input).longValue()); } else if (input instanceof Float || input instanceof Double) { - return BigDecimal.valueOf(((Number) input).doubleValue()); + try { + return BigDecimal.valueOf(((Number) input).doubleValue()); + } catch (NumberFormatException e) { + /* NaN and infinity are not allowed */ + throw valueFormatNotAllowedException( + columnName, "NUMBER", "Not a valid number", insertRowIndex); + } } else if (input instanceof String) { try { final String stringInput = ((String) input).trim(); @@ -957,6 +964,66 @@ static double validateAndParseReal(String columnName, Object input, long insertR columnName, input.getClass(), "REAL", new String[] {"Number", "String"}, insertRowIndex); } + /** + * Validates and parses input Iceberg INT column. Allowed Java types: + * + *
    + *
  • Number + *
  • String + *
+ * + * @param columnName Column name, used in validation error messages + * @param input Object to validate and parse + * @param insertRowIndex Row index for error reporting + * @return Parsed integer + */ + static int validateAndParseIcebergInt(String columnName, Object input, long insertRowIndex) { + BigDecimal roundedValue = + validateAndParseBigDecimal(columnName, input, insertRowIndex) + .setScale(0, RoundingMode.HALF_UP); + try { + return roundedValue.intValueExact(); + } catch (ArithmeticException e) { + /* overflow */ + throw new SFException( + ErrorCode.INVALID_VALUE_ROW, + String.format( + "Number out of representable inclusive range of integers between %d and %d," + + " rowIndex:%d", + Integer.MIN_VALUE, Integer.MAX_VALUE, insertRowIndex)); + } + } + + /** + * Validates and parses input Iceberg LONG column. Allowed Java types: + * + *
    + *
  • Number + *
  • String + *
+ * + * @param columnName Column name, used in validation error messages + * @param input Object to validate and parse + * @param insertRowIndex Row index for error reporting + * @return Parsed long + */ + static long validateAndParseIcebergLong(String columnName, Object input, long insertRowIndex) { + BigDecimal roundedValue = + validateAndParseBigDecimal(columnName, input, insertRowIndex) + .setScale(0, RoundingMode.HALF_UP); + try { + return roundedValue.longValueExact(); + } catch (ArithmeticException e) { + /* overflow */ + throw new SFException( + ErrorCode.INVALID_VALUE_ROW, + String.format( + "Number out of representable inclusive range of integers between %d and %d," + + " rowIndex:%d", + Long.MIN_VALUE, Long.MAX_VALUE, insertRowIndex)); + } + } + /** * Validate and parse input to integer output, 1=true, 0=false. String values converted to boolean * according to https://docs.snowflake.com/en/sql-reference/functions/to_boolean.html#usage-notes @@ -1003,6 +1070,16 @@ static void checkValueInRange( } } + static void checkFixedLengthByteArray(byte[] bytes, int length, final long insertRowIndex) { + if (bytes.length != length) { + throw new SFException( + ErrorCode.INVALID_FORMAT_ROW, + String.format( + "Binary length mismatch: expected=%d, actual=%d, rowIndex:%d", + length, bytes.length, insertRowIndex)); + } + } + static Set allowedBooleanStringsLowerCased = Sets.newHashSet("1", "0", "yes", "no", "y", "n", "t", "f", "true", "false", "on", "off"); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParser.java b/src/main/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParser.java new file mode 100644 index 000000000..18a66f4d5 --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParser.java @@ -0,0 +1,301 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal; + +import static net.snowflake.ingest.streaming.internal.DataValidationUtil.checkFixedLengthByteArray; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.math.RoundingMode; +import java.nio.charset.StandardCharsets; +import java.time.ZoneId; +import java.util.Arrays; +import java.util.Optional; +import net.snowflake.ingest.utils.Constants; +import net.snowflake.ingest.utils.ErrorCode; +import net.snowflake.ingest.utils.SFException; +import net.snowflake.ingest.utils.Utils; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.DateLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.StringLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.TimeLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Type.Repetition; + +/** Parses a user Iceberg column value into Parquet internal representation for buffering. */ +class IcebergParquetValueParser { + + /** + * Parses a user column value into Parquet internal representation for buffering. + * + * @param value column value provided by user in a row + * @param type Parquet column type + * @param stats column stats to update + * @param defaultTimezone default timezone to use for timestamp parsing + * @param insertRowsCurrIndex Row index corresponding the row to parse (w.r.t input rows in + * insertRows API, and not buffered row) + * @return parsed value and byte size of Parquet internal representation + */ + static ParquetBufferValue parseColumnValueToParquet( + Object value, + Type type, + RowBufferStats stats, + ZoneId defaultTimezone, + long insertRowsCurrIndex) { + Utils.assertNotNull("Parquet column stats", stats); + float estimatedParquetSize = 0F; + if (value != null) { + estimatedParquetSize += ParquetBufferValue.DEFINITION_LEVEL_ENCODING_BYTE_LEN; + PrimitiveType primitiveType = type.asPrimitiveType(); + switch (primitiveType.getPrimitiveTypeName()) { + case BOOLEAN: + int intValue = + DataValidationUtil.validateAndParseBoolean( + type.getName(), value, insertRowsCurrIndex); + value = intValue > 0; + stats.addIntValue(BigInteger.valueOf(intValue)); + estimatedParquetSize += ParquetBufferValue.BIT_ENCODING_BYTE_LEN; + break; + case INT32: + int intVal = getInt32Value(value, primitiveType, insertRowsCurrIndex); + value = intVal; + stats.addIntValue(BigInteger.valueOf(intVal)); + estimatedParquetSize += 4; + break; + case INT64: + long longVal = getInt64Value(value, primitiveType, defaultTimezone, insertRowsCurrIndex); + value = longVal; + stats.addIntValue(BigInteger.valueOf(longVal)); + estimatedParquetSize += 8; + break; + case FLOAT: + float floatVal = + (float) + DataValidationUtil.validateAndParseReal( + type.getName(), value, insertRowsCurrIndex); + value = floatVal; + stats.addRealValue((double) floatVal); + estimatedParquetSize += 4; + break; + case DOUBLE: + double doubleVal = + DataValidationUtil.validateAndParseReal(type.getName(), value, insertRowsCurrIndex); + value = doubleVal; + stats.addRealValue(doubleVal); + estimatedParquetSize += 8; + break; + case BINARY: + byte[] byteVal = getBinaryValue(value, primitiveType, stats, insertRowsCurrIndex); + value = byteVal; + estimatedParquetSize += + ParquetBufferValue.BYTE_ARRAY_LENGTH_ENCODING_BYTE_LEN + byteVal.length; + break; + case FIXED_LEN_BYTE_ARRAY: + byte[] fixedLenByteArrayVal = + getFixedLenByteArrayValue(value, primitiveType, stats, insertRowsCurrIndex); + value = fixedLenByteArrayVal; + estimatedParquetSize += + ParquetBufferValue.BYTE_ARRAY_LENGTH_ENCODING_BYTE_LEN + fixedLenByteArrayVal.length; + break; + default: + throw new SFException( + ErrorCode.UNKNOWN_DATA_TYPE, + type.getLogicalTypeAnnotation(), + primitiveType.getPrimitiveTypeName()); + } + } + + if (value == null) { + if (type.isRepetition(Repetition.REQUIRED)) { + throw new SFException( + ErrorCode.INVALID_FORMAT_ROW, type.getName(), "Passed null to non nullable field"); + } + stats.incCurrentNullCount(); + } + + return new ParquetBufferValue(value, estimatedParquetSize); + } + + /** + * Parses an int32 value based on Parquet logical type. + * + * @param value column value provided by user in a row + * @param type Parquet column type + * @param insertRowsCurrIndex Used for logging the row of index given in insertRows API + * @return parsed int32 value + */ + private static int getInt32Value( + Object value, PrimitiveType type, final long insertRowsCurrIndex) { + LogicalTypeAnnotation logicalTypeAnnotation = type.getLogicalTypeAnnotation(); + if (logicalTypeAnnotation == null) { + return DataValidationUtil.validateAndParseIcebergInt( + type.getName(), value, insertRowsCurrIndex); + } + if (logicalTypeAnnotation instanceof DecimalLogicalTypeAnnotation) { + return getDecimalValue(value, type, insertRowsCurrIndex).unscaledValue().intValue(); + } + if (logicalTypeAnnotation instanceof DateLogicalTypeAnnotation) { + return DataValidationUtil.validateAndParseDate(type.getName(), value, insertRowsCurrIndex); + } + throw new SFException( + ErrorCode.UNKNOWN_DATA_TYPE, logicalTypeAnnotation, type.getPrimitiveTypeName()); + } + + /** + * Parses an int64 value based on Parquet logical type. + * + * @param value column value provided by user in a row + * @param type Parquet column type + * @param insertRowsCurrIndex Used for logging the row of index given in insertRows API + * @return parsed int64 value + */ + private static long getInt64Value( + Object value, PrimitiveType type, ZoneId defaultTimezone, final long insertRowsCurrIndex) { + LogicalTypeAnnotation logicalTypeAnnotation = type.getLogicalTypeAnnotation(); + if (logicalTypeAnnotation == null) { + return DataValidationUtil.validateAndParseIcebergLong( + type.getName(), value, insertRowsCurrIndex); + } + if (logicalTypeAnnotation instanceof DecimalLogicalTypeAnnotation) { + return getDecimalValue(value, type, insertRowsCurrIndex).unscaledValue().longValue(); + } + if (logicalTypeAnnotation instanceof TimeLogicalTypeAnnotation) { + return DataValidationUtil.validateAndParseTime( + type.getName(), + value, + timeUnitToScale(((TimeLogicalTypeAnnotation) logicalTypeAnnotation).getUnit()), + insertRowsCurrIndex) + .longValue(); + } + if (logicalTypeAnnotation instanceof TimestampLogicalTypeAnnotation) { + boolean includeTimeZone = + ((TimestampLogicalTypeAnnotation) logicalTypeAnnotation).isAdjustedToUTC(); + return DataValidationUtil.validateAndParseTimestamp( + type.getName(), + value, + timeUnitToScale(((TimestampLogicalTypeAnnotation) logicalTypeAnnotation).getUnit()), + defaultTimezone, + !includeTimeZone, + insertRowsCurrIndex) + .toBinary(false) + .longValue(); + } + throw new SFException( + ErrorCode.UNKNOWN_DATA_TYPE, + logicalTypeAnnotation, + type.asPrimitiveType().getPrimitiveTypeName()); + } + + /** + * Converts an Iceberg binary or string column to its byte array representation. + * + * @param value value to parse + * @param type Parquet column type + * @param stats column stats to update + * @param insertRowsCurrIndex Used for logging the row of index given in insertRows API + * @return string representation + */ + private static byte[] getBinaryValue( + Object value, PrimitiveType type, RowBufferStats stats, final long insertRowsCurrIndex) { + LogicalTypeAnnotation logicalTypeAnnotation = type.getLogicalTypeAnnotation(); + if (logicalTypeAnnotation == null) { + byte[] bytes = + DataValidationUtil.validateAndParseBinary( + type.getName(), + value, + Optional.of(Constants.BINARY_COLUMN_MAX_SIZE), + insertRowsCurrIndex); + stats.addBinaryValue(bytes); + return bytes; + } + if (logicalTypeAnnotation instanceof StringLogicalTypeAnnotation) { + String string = + DataValidationUtil.validateAndParseString( + type.getName(), + value, + Optional.of(Constants.VARCHAR_COLUMN_MAX_SIZE), + insertRowsCurrIndex); + stats.addStrValue(string); + return string.getBytes(StandardCharsets.UTF_8); + } + throw new SFException( + ErrorCode.UNKNOWN_DATA_TYPE, logicalTypeAnnotation, type.getPrimitiveTypeName()); + } + + /** + * Converts an Iceberg fixed length byte array column to its byte array representation. + * + * @param value value to parse + * @param type Parquet column type + * @param stats column stats to update + * @param insertRowsCurrIndex Used for logging the row of index given in insertRows API + * @return string representation + */ + private static byte[] getFixedLenByteArrayValue( + Object value, PrimitiveType type, RowBufferStats stats, final long insertRowsCurrIndex) { + LogicalTypeAnnotation logicalTypeAnnotation = type.getLogicalTypeAnnotation(); + int length = type.getTypeLength(); + byte[] bytes = null; + if (logicalTypeAnnotation == null) { + bytes = + DataValidationUtil.validateAndParseBinary( + type.getName(), value, Optional.of(length), insertRowsCurrIndex); + stats.addBinaryValue(bytes); + } + if (logicalTypeAnnotation instanceof DecimalLogicalTypeAnnotation) { + BigInteger bigIntegerVal = getDecimalValue(value, type, insertRowsCurrIndex).unscaledValue(); + stats.addIntValue(bigIntegerVal); + bytes = bigIntegerVal.toByteArray(); + if (bytes.length < length) { + byte[] newBytes = new byte[length]; + Arrays.fill(newBytes, (byte) (bytes[0] < 0 ? -1 : 0)); + System.arraycopy(bytes, 0, newBytes, length - bytes.length, bytes.length); + bytes = newBytes; + } + } + if (bytes != null) { + checkFixedLengthByteArray(bytes, length, insertRowsCurrIndex); + return bytes; + } + throw new SFException( + ErrorCode.UNKNOWN_DATA_TYPE, logicalTypeAnnotation, type.getPrimitiveTypeName()); + } + + /** + * Converts a decimal value to its BigDecimal representation. + * + * @param value value to parse + * @param type Parquet column type + * @param insertRowsCurrIndex Used for logging the row of index given in insertRows API + * @return BigDecimal representation + */ + private static BigDecimal getDecimalValue( + Object value, PrimitiveType type, final long insertRowsCurrIndex) { + int scale = ((DecimalLogicalTypeAnnotation) type.getLogicalTypeAnnotation()).getScale(); + int precision = ((DecimalLogicalTypeAnnotation) type.getLogicalTypeAnnotation()).getPrecision(); + BigDecimal bigDecimalValue = + DataValidationUtil.validateAndParseBigDecimal(type.getName(), value, insertRowsCurrIndex); + bigDecimalValue = bigDecimalValue.setScale(scale, RoundingMode.HALF_UP); + DataValidationUtil.checkValueInRange(bigDecimalValue, scale, precision, insertRowsCurrIndex); + return bigDecimalValue; + } + + private static int timeUnitToScale(LogicalTypeAnnotation.TimeUnit timeUnit) { + switch (timeUnit) { + case MILLIS: + return 3; + case MICROS: + return 6; + case NANOS: + return 9; + default: + throw new SFException( + ErrorCode.INTERNAL_ERROR, String.format("Unknown time unit: %s", timeUnit)); + } + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelRequestInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelRequestInternal.java index ff53f6729..c3c427c68 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelRequestInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelRequestInternal.java @@ -32,6 +32,10 @@ class OpenChannelRequestInternal implements IStreamingIngestRequest { @JsonProperty("write_mode") private String writeMode; + @JsonInclude(JsonInclude.Include.NON_NULL) + @JsonProperty("is_iceberg") + private boolean isIceberg; + @JsonInclude(JsonInclude.Include.NON_NULL) @JsonProperty("offset_token") private String offsetToken; @@ -44,6 +48,7 @@ class OpenChannelRequestInternal implements IStreamingIngestRequest { String table, String channel, Constants.WriteMode writeMode, + boolean isIceberg, String offsetToken) { this.requestId = requestId; this.role = role; @@ -52,6 +57,7 @@ class OpenChannelRequestInternal implements IStreamingIngestRequest { this.table = table; this.channel = channel; this.writeMode = writeMode.name(); + this.isIceberg = isIceberg; this.offsetToken = offsetToken; } @@ -83,6 +89,10 @@ String getWriteMode() { return writeMode; } + boolean getIsIceberg() { + return isIceberg; + } + String getOffsetToken() { return offsetToken; } @@ -91,7 +101,7 @@ String getOffsetToken() { public String getStringForLogging() { return String.format( "OpenChannelRequestInternal(requestId=%s, role=%s, db=%s, schema=%s, table=%s, channel=%s," - + " writeMode=%s)", - requestId, role, database, schema, table, channel, writeMode); + + " writeMode=%s, isIceberg=%s)", + requestId, role, database, schema, table, channel, writeMode, isIceberg); } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetBufferValue.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetBufferValue.java new file mode 100644 index 000000000..f89de0aa7 --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetBufferValue.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal; + +/** Parquet internal value representation for buffering. */ +class ParquetBufferValue { + // Parquet uses BitPacking to encode boolean, hence 1 bit per value + public static final float BIT_ENCODING_BYTE_LEN = 1.0f / 8; + + /** + * On average parquet needs 2 bytes / 8 values for the RLE+bitpack encoded definition level. + * + *
    + * There are two cases how definition level (0 for null values, 1 for non-null values) is + * encoded: + *
  • If there are at least 8 repeated values in a row, they are run-length encoded (length + + * value itself). E.g. 11111111 -> 8 1 + *
  • If there are less than 8 repeated values, they are written in group as part of a + * bit-length encoded run, e.g. 1111 -> 15 A bit-length encoded run ends when either 64 + * groups of 8 values have been written or if a new RLE run starts. + *

    To distinguish between RLE and bitpack run, there is 1 extra bytes written as header + * when a bitpack run starts. + *

+ * + *
    + * For more details see ColumnWriterV1#createDLWriter and {@link + * org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder#writeInt(int)} + *
+ */ + public static final float DEFINITION_LEVEL_ENCODING_BYTE_LEN = 2.0f / 8; + + // Parquet stores length in 4 bytes before the actual data bytes + public static final int BYTE_ARRAY_LENGTH_ENCODING_BYTE_LEN = 4; + private final Object value; + private final float size; + + ParquetBufferValue(Object value, float size) { + this.value = value; + this.size = size; + } + + Object getValue() { + return value; + } + + float getSize() { + return size; + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetColumn.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetColumn.java new file mode 100644 index 000000000..68ed7a8c9 --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetColumn.java @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal; + +import org.apache.parquet.schema.Type; + +/** Represents a column in a Parquet file. */ +class ParquetColumn { + final ColumnMetadata columnMetadata; + final int index; + final Type type; + + ParquetColumn(ColumnMetadata columnMetadata, int index, Type type) { + this.columnMetadata = columnMetadata; + this.index = index; + this.type = type; + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java index bcd01aea3..395bea98f 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2022-2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.streaming.internal; @@ -13,6 +13,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.function.Consumer; @@ -25,7 +26,6 @@ import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.SFException; import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; /** @@ -81,14 +81,13 @@ public void setupSchema(List columns) { int id = 1; for (ColumnMetadata column : columns) { validateColumnCollation(column); - ParquetTypeGenerator.ParquetTypeInfo typeInfo = - ParquetTypeGenerator.generateColumnParquetTypeInfo(column, id); + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(column, id); parquetTypes.add(typeInfo.getParquetType()); this.metadata.putAll(typeInfo.getMetadata()); int columnIndex = parquetTypes.size() - 1; fieldIndex.put( column.getInternalName(), - new ParquetColumn(column, columnIndex, typeInfo.getPrimitiveTypeName())); + new ParquetColumn(column, columnIndex, typeInfo.getParquetType())); if (!column.getNullable()) { addNonNullableFieldName(column.getInternalName()); } @@ -172,15 +171,18 @@ private float addRow( RowBufferStats forkedStats = statsMap.get(columnName).forkEmpty(); forkedStatsMap.put(columnName, forkedStats); ColumnMetadata column = parquetColumn.columnMetadata; - ParquetValueParser.ParquetBufferValue valueWithSize = - ParquetValueParser.parseColumnValueToParquet( - value, - column, - parquetColumn.type, - forkedStats, - defaultTimezone, - insertRowsCurrIndex, - clientBufferParameters.isEnableNewJsonParsingLogic()); + ParquetBufferValue valueWithSize = + (clientBufferParameters.getIsIcebergMode() + ? IcebergParquetValueParser.parseColumnValueToParquet( + value, parquetColumn.type, forkedStats, defaultTimezone, insertRowsCurrIndex) + : SnowflakeParquetValueParser.parseColumnValueToParquet( + value, + column, + parquetColumn.type.asPrimitiveType().getPrimitiveTypeName(), + forkedStats, + defaultTimezone, + insertRowsCurrIndex, + clientBufferParameters.isEnableNewJsonParsingLogic())); indexedRow[colIndex] = valueWithSize.getValue(); size += valueWithSize.getSize(); } @@ -264,6 +266,10 @@ Object getVectorValueAt(String column, int index) { if (logicalType == ColumnLogicalType.BINARY && value != null) { value = value instanceof String ? ((String) value).getBytes(StandardCharsets.UTF_8) : value; } + /* Mismatch between Iceberg string & FDN String */ + if (Objects.equals(columnMetadata.getSourceIcebergDataType(), "\"string\"")) { + value = value instanceof byte[] ? new String((byte[]) value, StandardCharsets.UTF_8) : value; + } return value; } @@ -289,17 +295,4 @@ public Flusher createFlusher() { clientBufferParameters.getMaxChunkSizeInBytes(), clientBufferParameters.getBdecParquetCompression()); } - - private static class ParquetColumn { - final ColumnMetadata columnMetadata; - final int index; - final PrimitiveType.PrimitiveTypeName type; - - private ParquetColumn( - ColumnMetadata columnMetadata, int index, PrimitiveType.PrimitiveTypeName type) { - this.columnMetadata = columnMetadata; - this.index = index; - this.type = type; - } - } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetTypeGenerator.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetTypeGenerator.java index c6d0e87c3..d4b70f397 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetTypeGenerator.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetTypeGenerator.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2022-2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.streaming.internal; @@ -10,6 +10,7 @@ import java.util.Map; import java.util.Set; import net.snowflake.ingest.utils.ErrorCode; +import net.snowflake.ingest.utils.IcebergDataTypeParser; import net.snowflake.ingest.utils.SFException; import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.PrimitiveType; @@ -19,35 +20,6 @@ /** Generates the Parquet types for the Snowflake's column types */ public class ParquetTypeGenerator { - /** - * Util class that contains Parquet type and other metadata for that type needed by the Snowflake - * server side scanner - */ - static class ParquetTypeInfo { - private Type parquetType; - private Map metadata; - - public Type getParquetType() { - return this.parquetType; - } - - public Map getMetadata() { - return this.metadata; - } - - public void setParquetType(Type parquetType) { - this.parquetType = parquetType; - } - - public void setMetadata(Map metadata) { - this.metadata = metadata; - } - - public PrimitiveType.PrimitiveTypeName getPrimitiveTypeName() { - return parquetType.asPrimitiveType().getPrimitiveTypeName(); - } - } - private static final Set TIME_SUPPORTED_PHYSICAL_TYPES = new HashSet<>( Arrays.asList( @@ -69,23 +41,10 @@ public PrimitiveType.PrimitiveTypeName getPrimitiveTypeName() { */ static ParquetTypeInfo generateColumnParquetTypeInfo(ColumnMetadata column, int id) { id = column.getOrdinal() == null ? id : column.getOrdinal(); - ParquetTypeInfo res = new ParquetTypeInfo(); Type parquetType; Map metadata = new HashMap<>(); String name = column.getInternalName(); - AbstractRowBuffer.ColumnPhysicalType physicalType; - AbstractRowBuffer.ColumnLogicalType logicalType; - try { - physicalType = AbstractRowBuffer.ColumnPhysicalType.valueOf(column.getPhysicalType()); - logicalType = AbstractRowBuffer.ColumnLogicalType.valueOf(column.getLogicalType()); - } catch (IllegalArgumentException e) { - throw new SFException( - ErrorCode.UNKNOWN_DATA_TYPE, column.getLogicalType(), column.getPhysicalType()); - } - - metadata.put(Integer.toString(id), logicalType.getOrdinal() + "," + physicalType.getOrdinal()); - // Parquet Type.Repetition in general supports repeated values for the same row column, like a // list of values. // This generator uses only either 0 or 1 value for nullable data type (OPTIONAL: 0 or none @@ -94,73 +53,94 @@ static ParquetTypeInfo generateColumnParquetTypeInfo(ColumnMetadata column, int Type.Repetition repetition = column.getNullable() ? Type.Repetition.OPTIONAL : Type.Repetition.REQUIRED; - // Handle differently depends on the column logical and physical types - switch (logicalType) { - case FIXED: - parquetType = getFixedColumnParquetType(column, id, physicalType, repetition); - break; - case ARRAY: - case OBJECT: - case VARIANT: - // mark the column metadata as being an object json for the server side scanner - metadata.put(id + ":obj_enc", "1"); - // parquetType is same as the next one - case ANY: - case CHAR: - case TEXT: - case BINARY: - parquetType = - Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, repetition) - .as(LogicalTypeAnnotation.stringType()) - .id(id) - .named(name); - break; - case TIMESTAMP_LTZ: - case TIMESTAMP_NTZ: - case TIMESTAMP_TZ: - parquetType = - getTimeColumnParquetType( - column.getScale(), - physicalType, - logicalType, - TIMESTAMP_SUPPORTED_PHYSICAL_TYPES, - repetition, - id, - name); - break; - case DATE: - parquetType = - Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition) - .as(LogicalTypeAnnotation.dateType()) - .id(id) - .named(name); - break; - case TIME: - parquetType = - getTimeColumnParquetType( - column.getScale(), - physicalType, - logicalType, - TIME_SUPPORTED_PHYSICAL_TYPES, - repetition, - id, - name); - break; - case BOOLEAN: - parquetType = - Types.primitive(PrimitiveType.PrimitiveTypeName.BOOLEAN, repetition).id(id).named(name); - break; - case REAL: - parquetType = - Types.primitive(PrimitiveType.PrimitiveTypeName.DOUBLE, repetition).id(id).named(name); - break; - default: + if (column.getSourceIcebergDataType() != null) { + parquetType = + IcebergDataTypeParser.parseIcebergDataTypeStringToParquetType( + column.getSourceIcebergDataType(), repetition, id, name); + } else { + AbstractRowBuffer.ColumnPhysicalType physicalType; + AbstractRowBuffer.ColumnLogicalType logicalType; + try { + physicalType = AbstractRowBuffer.ColumnPhysicalType.valueOf(column.getPhysicalType()); + logicalType = AbstractRowBuffer.ColumnLogicalType.valueOf(column.getLogicalType()); + } catch (IllegalArgumentException e) { throw new SFException( ErrorCode.UNKNOWN_DATA_TYPE, column.getLogicalType(), column.getPhysicalType()); + } + + metadata.put( + Integer.toString(id), logicalType.getOrdinal() + "," + physicalType.getOrdinal()); + + // Handle differently depends on the column logical and physical types + switch (logicalType) { + case FIXED: + parquetType = getFixedColumnParquetType(column, id, physicalType, repetition); + break; + case ARRAY: + case OBJECT: + case VARIANT: + // mark the column metadata as being an object json for the server side scanner + metadata.put(id + ":obj_enc", "1"); + // parquetType is same as the next one + case ANY: + case CHAR: + case TEXT: + case BINARY: + parquetType = + Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, repetition) + .as(LogicalTypeAnnotation.stringType()) + .id(id) + .named(name); + break; + case TIMESTAMP_LTZ: + case TIMESTAMP_NTZ: + case TIMESTAMP_TZ: + parquetType = + getTimeColumnParquetType( + column.getScale(), + physicalType, + logicalType, + TIMESTAMP_SUPPORTED_PHYSICAL_TYPES, + repetition, + id, + name); + break; + case DATE: + parquetType = + Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition) + .as(LogicalTypeAnnotation.dateType()) + .id(id) + .named(name); + break; + case TIME: + parquetType = + getTimeColumnParquetType( + column.getScale(), + physicalType, + logicalType, + TIME_SUPPORTED_PHYSICAL_TYPES, + repetition, + id, + name); + break; + case BOOLEAN: + parquetType = + Types.primitive(PrimitiveType.PrimitiveTypeName.BOOLEAN, repetition) + .id(id) + .named(name); + break; + case REAL: + parquetType = + Types.primitive(PrimitiveType.PrimitiveTypeName.DOUBLE, repetition) + .id(id) + .named(name); + break; + default: + throw new SFException( + ErrorCode.UNKNOWN_DATA_TYPE, column.getLogicalType(), column.getPhysicalType()); + } } - res.setParquetType(parquetType); - res.setMetadata(metadata); - return res; + return new ParquetTypeInfo(parquetType, metadata); } /** diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetTypeInfo.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetTypeInfo.java new file mode 100644 index 000000000..9e98bbcc9 --- /dev/null +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetTypeInfo.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal; + +import java.util.Map; +import org.apache.parquet.schema.Type; + +/** + * Util class that contains Parquet type and other metadata for that type needed by the Snowflake + * server side scanner + */ +class ParquetTypeInfo { + private final Type parquetType; + private final Map metadata; + + ParquetTypeInfo(Type parquetType, Map metadata) { + this.parquetType = parquetType; + this.metadata = metadata; + } + + public Type getParquetType() { + return this.parquetType; + } + + public Map getMetadata() { + return this.metadata; + } +} diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/RegisterBlobRequest.java b/src/main/java/net/snowflake/ingest/streaming/internal/RegisterBlobRequest.java index fcb7edf4f..79a91943d 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/RegisterBlobRequest.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/RegisterBlobRequest.java @@ -4,6 +4,7 @@ package net.snowflake.ingest.streaming.internal; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import java.util.List; import java.util.stream.Collectors; @@ -19,10 +20,15 @@ class RegisterBlobRequest implements IStreamingIngestRequest { @JsonProperty("blobs") private List blobs; - RegisterBlobRequest(String requestId, String role, List blobs) { + @JsonInclude(JsonInclude.Include.NON_NULL) + @JsonProperty("is_iceberg") + private boolean isIceberg; + + RegisterBlobRequest(String requestId, String role, List blobs, boolean isIceberg) { this.requestId = requestId; this.role = role; this.blobs = blobs; + this.isIceberg = isIceberg; } String getRequestId() { @@ -37,6 +43,10 @@ List getBlobs() { return blobs; } + boolean getIsIceberg() { + return isIceberg; + } + @Override public String getStringForLogging() { return String.format( diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetValueParser.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeParquetValueParser.java similarity index 86% rename from src/main/java/net/snowflake/ingest/streaming/internal/ParquetValueParser.java rename to src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeParquetValueParser.java index 298ec2ba2..ba4a38b68 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetValueParser.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeParquetValueParser.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2022-2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.streaming.internal; @@ -15,58 +15,8 @@ import net.snowflake.ingest.utils.Utils; import org.apache.parquet.schema.PrimitiveType; -/** Parses a user column value into Parquet internal representation for buffering. */ -class ParquetValueParser { - - // Parquet uses BitPacking to encode boolean, hence 1 bit per value - public static final float BIT_ENCODING_BYTE_LEN = 1.0f / 8; - - /** - * On average parquet needs 2 bytes / 8 values for the RLE+bitpack encoded definition level. - * - *
    - * There are two cases how definition level (0 for null values, 1 for non-null values) is - * encoded: - *
  • If there are at least 8 repeated values in a row, they are run-length encoded (length + - * value itself). E.g. 11111111 -> 8 1 - *
  • If there are less than 8 repeated values, they are written in group as part of a - * bit-length encoded run, e.g. 1111 -> 15 A bit-length encoded run ends when either 64 - * groups of 8 values have been written or if a new RLE run starts. - *

    To distinguish between RLE and bitpack run, there is 1 extra bytes written as header - * when a bitpack run starts. - *

- * - *
    - * For more details see ColumnWriterV1#createDLWriter and {@link - * org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder#writeInt(int)} - *
- * - *

Since we don't have nested types, repetition level is always 0 and is not stored at all by - * Parquet. - */ - public static final float DEFINITION_LEVEL_ENCODING_BYTE_LEN = 2.0f / 8; - - // Parquet stores length in 4 bytes before the actual data bytes - public static final int BYTE_ARRAY_LENGTH_ENCODING_BYTE_LEN = 4; - - /** Parquet internal value representation for buffering. */ - static class ParquetBufferValue { - private final Object value; - private final float size; - - ParquetBufferValue(Object value, float size) { - this.value = value; - this.size = size; - } - - Object getValue() { - return value; - } - - float getSize() { - return size; - } - } +/** Parses a user Snowflake column value into Parquet internal representation for buffering. */ +class SnowflakeParquetValueParser { /** * Parses a user column value into Parquet internal representation for buffering. @@ -89,7 +39,7 @@ static ParquetBufferValue parseColumnValueToParquet( boolean enableNewJsonParsingLogic) { Utils.assertNotNull("Parquet column stats", stats); float estimatedParquetSize = 0F; - estimatedParquetSize += DEFINITION_LEVEL_ENCODING_BYTE_LEN; + estimatedParquetSize += ParquetBufferValue.DEFINITION_LEVEL_ENCODING_BYTE_LEN; if (value != null) { AbstractRowBuffer.ColumnLogicalType logicalType = AbstractRowBuffer.ColumnLogicalType.valueOf(columnMetadata.getLogicalType()); @@ -102,7 +52,7 @@ static ParquetBufferValue parseColumnValueToParquet( columnMetadata.getName(), value, insertRowsCurrIndex); value = intValue > 0; stats.addIntValue(BigInteger.valueOf(intValue)); - estimatedParquetSize += BIT_ENCODING_BYTE_LEN; + estimatedParquetSize += ParquetBufferValue.BIT_ENCODING_BYTE_LEN; break; case INT32: int intVal = @@ -157,7 +107,8 @@ static ParquetBufferValue parseColumnValueToParquet( } } if (value != null) { - estimatedParquetSize += (BYTE_ARRAY_LENGTH_ENCODING_BYTE_LEN + length); + estimatedParquetSize += + (ParquetBufferValue.BYTE_ARRAY_LENGTH_ENCODING_BYTE_LEN + length); } break; @@ -415,6 +366,7 @@ private static String getBinaryValue( } return str; } + /** * Converts a binary value to its byte array representation. * diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index be6b192b0..b476859d8 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -335,6 +335,7 @@ public SnowflakeStreamingIngestChannelInternal openChannel(OpenChannelRequest request.getFullyQualifiedTableName(), getName()); + OpenChannelResponse response = null; try { OpenChannelRequestInternal openChannelRequest = new OpenChannelRequestInternal( @@ -345,48 +346,56 @@ public SnowflakeStreamingIngestChannelInternal openChannel(OpenChannelRequest request.getTableName(), request.getChannelName(), Constants.WriteMode.CLOUD_STORAGE, + this.isIcebergMode, request.getOffsetToken()); - OpenChannelResponse response = snowflakeServiceClient.openChannel(openChannelRequest); - - logger.logInfo( - "Open channel request succeeded, channel={}, table={}, clientSequencer={}," - + " rowSequencer={}, client={}", - request.getChannelName(), - request.getFullyQualifiedTableName(), - response.getClientSequencer(), - response.getRowSequencer(), - getName()); - - // Channel is now registered, add it to the in-memory channel pool - SnowflakeStreamingIngestChannelInternal channel = - SnowflakeStreamingIngestChannelFactory.builder(response.getChannelName()) - .setDBName(response.getDBName()) - .setSchemaName(response.getSchemaName()) - .setTableName(response.getTableName()) - .setOffsetToken(response.getOffsetToken()) - .setRowSequencer(response.getRowSequencer()) - .setChannelSequencer(response.getClientSequencer()) - .setOwningClient(this) - .setEncryptionKey(response.getEncryptionKey()) - .setEncryptionKeyId(response.getEncryptionKeyId()) - .setOnErrorOption(request.getOnErrorOption()) - .setDefaultTimezone(request.getDefaultTimezone()) - .setOffsetTokenVerificationFunction(request.getOffsetTokenVerificationFunction()) - .build(); - - // Setup the row buffer schema - channel.setupSchema(response.getTableColumns()); - - // Add channel to the channel cache - this.channelCache.addChannel(channel); - this.storageManager.registerTable( - new TableRef(response.getDBName(), response.getSchemaName(), response.getTableName()), - response.getExternalVolumeLocation()); - - return channel; + response = snowflakeServiceClient.openChannel(openChannelRequest); } catch (IOException | IngestResponseException e) { throw new SFException(e, ErrorCode.OPEN_CHANNEL_FAILURE, e.getMessage()); } + + if (isIcebergMode + && response.getTableColumns().stream() + .anyMatch(c -> c.getSourceIcebergDataType() == null)) { + throw new SFException( + ErrorCode.INTERNAL_ERROR, "Iceberg table columns must have sourceIcebergDataType set."); + } + + logger.logInfo( + "Open channel request succeeded, channel={}, table={}, clientSequencer={}," + + " rowSequencer={}, client={}", + request.getChannelName(), + request.getFullyQualifiedTableName(), + response.getClientSequencer(), + response.getRowSequencer(), + getName()); + + // Channel is now registered, add it to the in-memory channel pool + SnowflakeStreamingIngestChannelInternal channel = + SnowflakeStreamingIngestChannelFactory.builder(response.getChannelName()) + .setDBName(response.getDBName()) + .setSchemaName(response.getSchemaName()) + .setTableName(response.getTableName()) + .setOffsetToken(response.getOffsetToken()) + .setRowSequencer(response.getRowSequencer()) + .setChannelSequencer(response.getClientSequencer()) + .setOwningClient(this) + .setEncryptionKey(response.getEncryptionKey()) + .setEncryptionKeyId(response.getEncryptionKeyId()) + .setOnErrorOption(request.getOnErrorOption()) + .setDefaultTimezone(request.getDefaultTimezone()) + .setOffsetTokenVerificationFunction(request.getOffsetTokenVerificationFunction()) + .build(); + + // Setup the row buffer schema + channel.setupSchema(response.getTableColumns()); + + // Add channel to the channel cache + this.channelCache.addChannel(channel); + this.storageManager.registerTable( + new TableRef(response.getDBName(), response.getSchemaName(), response.getTableName()), + response.getExternalVolumeLocation()); + + return channel; } @Override @@ -568,7 +577,8 @@ void registerBlobs(List blobs, final int executionCount) { new RegisterBlobRequest( this.storageManager.getClientPrefix() + "_" + counter.getAndIncrement(), this.role, - blobs); + blobs, + this.isIcebergMode); response = snowflakeServiceClient.registerBlob(request, executionCount); } catch (IOException | IngestResponseException e) { throw new SFException(e, ErrorCode.REGISTER_BLOB_FAILURE, e.getMessage()); @@ -915,6 +925,11 @@ public void setRefreshToken(String refreshToken) { } } + /** Return whether the client is streaming to Iceberg tables */ + boolean isIcebergMode() { + return isIcebergMode; + } + /** * Registers the performance metrics along with JVM memory and Threads. * diff --git a/src/main/java/net/snowflake/ingest/utils/Constants.java b/src/main/java/net/snowflake/ingest/utils/Constants.java index 534a1815e..35eed3469 100644 --- a/src/main/java/net/snowflake/ingest/utils/Constants.java +++ b/src/main/java/net/snowflake/ingest/utils/Constants.java @@ -62,6 +62,8 @@ public class Constants { public static final int STREAMING_INGEST_TELEMETRY_UPLOAD_INTERVAL_IN_SEC = 10; public static final long EP_NDV_UNKNOWN = -1L; public static final int MAX_OAUTH_REFRESH_TOKEN_RETRY = 3; + public static final int BINARY_COLUMN_MAX_SIZE = 8 * 1024 * 1024; + public static final int VARCHAR_COLUMN_MAX_SIZE = 16 * 1024 * 1024; // Channel level constants public static final String CHANNEL_STATUS_ENDPOINT = "/v1/streaming/channels/status/"; diff --git a/src/main/java/net/snowflake/ingest/utils/IcebergDataTypeParser.java b/src/main/java/net/snowflake/ingest/utils/IcebergDataTypeParser.java new file mode 100644 index 000000000..95e484b70 --- /dev/null +++ b/src/main/java/net/snowflake/ingest/utils/IcebergDataTypeParser.java @@ -0,0 +1,198 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.utils; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import java.util.Iterator; +import java.util.List; +import javax.annotation.Nonnull; +import org.apache.iceberg.parquet.TypeToMessageType; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.JsonUtil; + +/** + * This class is used to Iceberg data type (include primitive types and nested types) serialization + * and deserialization. + * + *

This code is modified from + * GlobalServices/modules/data-lake/datalake-api/src/main/java/com/snowflake/metadata/iceberg + * /IcebergDataTypeParser.java + */ +public class IcebergDataTypeParser { + private static final String TYPE = "type"; + private static final String STRUCT = "struct"; + private static final String LIST = "list"; + private static final String MAP = "map"; + private static final String FIELDS = "fields"; + private static final String ELEMENT = "element"; + private static final String KEY = "key"; + private static final String VALUE = "value"; + private static final String DOC = "doc"; + private static final String NAME = "name"; + private static final String ID = "id"; + private static final String ELEMENT_ID = "element-id"; + private static final String KEY_ID = "key-id"; + private static final String VALUE_ID = "value-id"; + private static final String REQUIRED = "required"; + private static final String ELEMENT_REQUIRED = "element-required"; + private static final String VALUE_REQUIRED = "value-required"; + + /** Object mapper for this class */ + private static final ObjectMapper MAPPER = new ObjectMapper(); + + /** Util class that contains the mapping between Iceberg data type and Parquet data type */ + private static final TypeToMessageType typeToMessageType = new TypeToMessageType(); + + /** + * Get Iceberg data type information by deserialization. + * + * @param icebergDataType string representation of Iceberg data type + * @param repetition repetition of the Parquet data type + * @param id column id + * @param name column name + * @return Iceberg data type + */ + public static org.apache.parquet.schema.Type parseIcebergDataTypeStringToParquetType( + String icebergDataType, + org.apache.parquet.schema.Type.Repetition repetition, + int id, + String name) { + Type icebergType = deserializeIcebergType(icebergDataType); + if (!icebergType.isPrimitiveType()) { + throw new IllegalArgumentException( + String.format("Snowflake supports only primitive Iceberg types, got '%s'", icebergType)); + } + return typeToMessageType.primitive(icebergType.asPrimitiveType(), repetition, id, name); + } + + /** + * Get Iceberg data type information by deserialization. + * + * @param icebergDataType string representation of Iceberg data type + * @return Iceberg data type + */ + public static Type deserializeIcebergType(String icebergDataType) { + try { + JsonNode json = MAPPER.readTree(icebergDataType); + return getTypeFromJson(json); + } catch (JsonProcessingException e) { + throw new IllegalArgumentException( + String.format("Failed to deserialize Iceberg data type: %s", icebergDataType)); + } + } + + /** + * Get corresponding Iceberg data type from JsonNode. + * + * @param jsonNode JsonNode parsed from Iceberg type string. + * @return Iceberg data type + */ + public static Type getTypeFromJson(@Nonnull JsonNode jsonNode) { + if (jsonNode.isTextual()) { + return Types.fromPrimitiveString(jsonNode.asText()); + } else if (jsonNode.isObject()) { + if (!jsonNode.has(TYPE)) { + throw new IllegalArgumentException( + String.format("Missing key '%s' in schema: %s", TYPE, jsonNode)); + } + String type = jsonNode.get(TYPE).asText(); + if (STRUCT.equals(type)) { + return structFromJson(jsonNode); + } else if (LIST.equals(type)) { + return listFromJson(jsonNode); + } else if (MAP.equals(type)) { + return mapFromJson(jsonNode); + } + throw new IllegalArgumentException( + String.format("Cannot parse Iceberg type: %s, schema: %s", type, jsonNode)); + } + + throw new IllegalArgumentException("Cannot parse Iceberg type from schema: " + jsonNode); + } + + /** + * Get Iceberg struct type information from JsonNode. + * + * @param json JsonNode parsed from Iceberg type string. + * @return struct type + */ + public static @Nonnull Types.StructType structFromJson(@Nonnull JsonNode json) { + if (!json.has(FIELDS)) { + throw new IllegalArgumentException( + String.format("Missing key '%s' in schema: %s", FIELDS, json)); + } + JsonNode fieldArray = json.get(FIELDS); + Preconditions.checkArgument(fieldArray != null, "Field array cannot be null"); + Preconditions.checkArgument( + fieldArray.isArray(), "Cannot parse struct fields from non-array: %s", fieldArray); + + List fields = Lists.newArrayListWithExpectedSize(fieldArray.size()); + Iterator iterator = fieldArray.elements(); + while (iterator.hasNext()) { + JsonNode field = iterator.next(); + Preconditions.checkArgument( + field.isObject(), "Cannot parse struct field from non-object: %s", field); + + int id = JsonUtil.getInt(ID, field); + String name = JsonUtil.getString(NAME, field); + Type type = getTypeFromJson(field.get(TYPE)); + + String doc = JsonUtil.getStringOrNull(DOC, field); + boolean isRequired = JsonUtil.getBool(REQUIRED, field); + if (isRequired) { + fields.add(Types.NestedField.required(id, name, type, doc)); + } else { + fields.add(Types.NestedField.optional(id, name, type, doc)); + } + } + + return Types.StructType.of(fields); + } + + /** + * Get Iceberg list type information from JsonNode. + * + * @param json JsonNode parsed from Iceberg type string. + * @return list type + */ + public static Types.ListType listFromJson(JsonNode json) { + int elementId = JsonUtil.getInt(ELEMENT_ID, json); + Type elementType = getTypeFromJson(json.get(ELEMENT)); + boolean isRequired = JsonUtil.getBool(ELEMENT_REQUIRED, json); + + if (isRequired) { + return Types.ListType.ofRequired(elementId, elementType); + } else { + return Types.ListType.ofOptional(elementId, elementType); + } + } + + /** + * Get Iceberg map type from JsonNode. + * + * @param json JsonNode parsed from Iceberg type string. + * @return map type + */ + public static Types.MapType mapFromJson(JsonNode json) { + int keyId = JsonUtil.getInt(KEY_ID, json); + Type keyType = getTypeFromJson(json.get(KEY)); + + int valueId = JsonUtil.getInt(VALUE_ID, json); + Type valueType = getTypeFromJson(json.get(VALUE)); + + boolean isRequired = JsonUtil.getBool(VALUE_REQUIRED, json); + + if (isRequired) { + return Types.MapType.ofRequired(keyId, valueId, keyType, valueType); + } else { + return Types.MapType.ofOptional(keyId, valueId, keyType, valueType); + } + } +} diff --git a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java index 885314b01..3d04aa1b2 100644 --- a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java +++ b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java @@ -79,6 +79,8 @@ public class ParameterProvider { public static final boolean ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT = true; + public static final boolean IS_ICEBERG_MODE_DEFAULT = false; + /** Map of parameter name to parameter value. This will be set by client/configure API Call. */ private final Map parameterMap = new HashMap<>(); diff --git a/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java b/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java index 58e7df4f3..eb5c20f03 100644 --- a/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java +++ b/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2022-2024 Snowflake Computing Inc. All rights reserved. */ package org.apache.parquet.hadoop; @@ -25,8 +25,10 @@ import org.apache.parquet.io.PositionOutputStream; import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.RecordConsumer; +import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; /** * BDEC specific parquet writer. @@ -278,51 +280,58 @@ public void write(List values) { + ") : " + values); } - recordConsumer.startMessage(); + writeValues(values, schema); + recordConsumer.endMessage(); + } + + private void writeValues(List values, GroupType type) { + List cols = type.getFields(); for (int i = 0; i < cols.size(); ++i) { Object val = values.get(i); - // val.length() == 0 indicates a NULL value. if (val != null) { - String fieldName = cols.get(i).getPath()[0]; + String fieldName = cols.get(i).getName(); recordConsumer.startField(fieldName, i); - PrimitiveType.PrimitiveTypeName typeName = - cols.get(i).getPrimitiveType().getPrimitiveTypeName(); - switch (typeName) { - case BOOLEAN: - recordConsumer.addBoolean((boolean) val); - break; - case FLOAT: - recordConsumer.addFloat((float) val); - break; - case DOUBLE: - recordConsumer.addDouble((double) val); - break; - case INT32: - recordConsumer.addInteger((int) val); - break; - case INT64: - recordConsumer.addLong((long) val); - break; - case BINARY: - Binary binVal = - val instanceof String - ? Binary.fromString((String) val) - : Binary.fromConstantByteArray((byte[]) val); - recordConsumer.addBinary(binVal); - break; - case FIXED_LEN_BYTE_ARRAY: - Binary binary = Binary.fromConstantByteArray((byte[]) val); - recordConsumer.addBinary(binary); - break; - default: - throw new ParquetEncodingException( - "Unsupported column type: " + cols.get(i).getPrimitiveType()); + if (cols.get(i).isPrimitive()) { + PrimitiveType.PrimitiveTypeName typeName = + cols.get(i).asPrimitiveType().getPrimitiveTypeName(); + switch (typeName) { + case BOOLEAN: + recordConsumer.addBoolean((boolean) val); + break; + case FLOAT: + recordConsumer.addFloat((float) val); + break; + case DOUBLE: + recordConsumer.addDouble((double) val); + break; + case INT32: + recordConsumer.addInteger((int) val); + break; + case INT64: + recordConsumer.addLong((long) val); + break; + case BINARY: + Binary binVal = + val instanceof String + ? Binary.fromString((String) val) + : Binary.fromConstantByteArray((byte[]) val); + recordConsumer.addBinary(binVal); + break; + case FIXED_LEN_BYTE_ARRAY: + Binary binary = Binary.fromConstantByteArray((byte[]) val); + recordConsumer.addBinary(binary); + break; + default: + throw new ParquetEncodingException( + "Unsupported column type: " + cols.get(i).asPrimitiveType()); + } + } else { + throw new ParquetEncodingException("Unsupported column type: " + cols.get(i)); } recordConsumer.endField(fieldName, i); } } - recordConsumer.endMessage(); } } } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java index 62a6b7a4b..1cc3a2953 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java @@ -96,7 +96,7 @@ private List> createChannelDataPerTable(int metada } private static MessageType createSchema(String columnName) { - ParquetTypeGenerator.ParquetTypeInfo c1 = + ParquetTypeInfo c1 = ParquetTypeGenerator.generateColumnParquetTypeInfo(createTestTextColumn(columnName), 1); return new MessageType("bdec", Collections.singletonList(c1.getParquetType())); } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ColumnMetadataBuilder.java b/src/test/java/net/snowflake/ingest/streaming/internal/ColumnMetadataBuilder.java index 01fd44918..4ad62f0f7 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ColumnMetadataBuilder.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ColumnMetadataBuilder.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2022-2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.streaming.internal; @@ -16,6 +16,7 @@ public class ColumnMetadataBuilder { private Integer length; private boolean nullable; private String collation; + private String sourceIcebergDataType; private Integer ordinal; @@ -144,6 +145,17 @@ public ColumnMetadataBuilder collation(String collation) { return this; } + /** + * Set column source Iceberg data type + * + * @param sourceIcebergDataType source Iceberg data type string + * @return columnMetadataBuilder object + */ + public ColumnMetadataBuilder sourceIcebergDataType(String sourceIcebergDataType) { + this.sourceIcebergDataType = sourceIcebergDataType; + return this; + } + /** * Set column ordinal * @@ -172,6 +184,7 @@ public ColumnMetadata build() { colMetadata.setScale(scale); colMetadata.setPrecision(precision); colMetadata.setCollation(collation); + colMetadata.setSourceIcebergDataType(sourceIcebergDataType); colMetadata.setOrdinal(ordinal); return colMetadata; } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/DataValidationUtilTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/DataValidationUtilTest.java index b92cc6e5e..0e738a4b3 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/DataValidationUtilTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/DataValidationUtilTest.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal; import static java.time.ZoneOffset.UTC; @@ -11,6 +15,8 @@ import static net.snowflake.ingest.streaming.internal.DataValidationUtil.validateAndParseBinary; import static net.snowflake.ingest.streaming.internal.DataValidationUtil.validateAndParseBoolean; import static net.snowflake.ingest.streaming.internal.DataValidationUtil.validateAndParseDate; +import static net.snowflake.ingest.streaming.internal.DataValidationUtil.validateAndParseIcebergInt; +import static net.snowflake.ingest.streaming.internal.DataValidationUtil.validateAndParseIcebergLong; import static net.snowflake.ingest.streaming.internal.DataValidationUtil.validateAndParseObject; import static net.snowflake.ingest.streaming.internal.DataValidationUtil.validateAndParseObjectNew; import static net.snowflake.ingest.streaming.internal.DataValidationUtil.validateAndParseReal; @@ -1190,6 +1196,91 @@ public void testValidateAndParseBoolean() { expectError(ErrorCode.INVALID_VALUE_ROW, () -> validateAndParseBoolean("COL", "", 0)); } + @Test + public void testValidateAndParseIcebergInt() { + assertEquals(1, validateAndParseIcebergInt("COL", 1, 0)); + assertEquals(1, validateAndParseIcebergInt("COL", 1L, 0)); + assertEquals(1, validateAndParseIcebergInt("COL", 1.499f, 0)); + assertEquals(0, validateAndParseIcebergLong("COL", -.0f, 0)); + assertEquals(1, validateAndParseIcebergInt("COL", 1.0d, 0)); + assertEquals(1, validateAndParseIcebergInt("COL", "1", 0)); + assertEquals(1, validateAndParseIcebergInt("COL", " 1 \t\n", 0)); + assertEquals(1, validateAndParseIcebergInt("COL", "0.5", 0)); + assertEquals(1, validateAndParseIcebergInt("COL", "1.0e0", 0)); + assertEquals(1, validateAndParseIcebergInt("COL", "1.0e+0", 0)); + assertEquals(1, validateAndParseIcebergInt("COL", "1.0e-0", 0)); + assertEquals(10, validateAndParseIcebergInt("COL", "1.0e1", 0)); + assertEquals(10, validateAndParseIcebergInt("COL", "1.0e+1", 0)); + assertEquals(0, validateAndParseIcebergInt("COL", "1.0e-1", 0)); + assertEquals(1, validateAndParseIcebergInt("COL", new BigDecimal("1.4"), 0)); + assertEquals(Integer.MAX_VALUE, validateAndParseIcebergInt("COL", "2147483647.499", 0)); + assertEquals(Integer.MIN_VALUE, validateAndParseIcebergInt("COL", "-2147483648.499", 0)); + + // Test forbidden values + expectError(ErrorCode.INVALID_FORMAT_ROW, () -> validateAndParseIcebergInt("COL", 'c', 0)); + expectError( + ErrorCode.INVALID_FORMAT_ROW, () -> validateAndParseIcebergInt("COL", new Object(), 0)); + expectError( + ErrorCode.INVALID_FORMAT_ROW, () -> validateAndParseIcebergInt("COL", new int[] {}, 0)); + expectError(ErrorCode.INVALID_VALUE_ROW, () -> validateAndParseIcebergInt("COL", "foo", 0)); + expectError( + ErrorCode.INVALID_VALUE_ROW, () -> validateAndParseIcebergInt("COL", "2147483647.5", 0)); + expectError( + ErrorCode.INVALID_VALUE_ROW, + () -> validateAndParseIcebergInt("COL", new BigDecimal("-2147483648.5"), 0)); + expectError( + ErrorCode.INVALID_VALUE_ROW, () -> validateAndParseIcebergInt("COL", Double.NaN, 0)); + expectError( + ErrorCode.INVALID_VALUE_ROW, + () -> validateAndParseIcebergInt("COL", Double.POSITIVE_INFINITY, 0)); + expectError( + ErrorCode.INVALID_VALUE_ROW, + () -> validateAndParseIcebergInt("COL", Float.NEGATIVE_INFINITY, 0)); + } + + @Test + public void testValidateAndParseIcebergLong() { + assertEquals(1L, validateAndParseIcebergLong("COL", 1, 0)); + assertEquals(1L, validateAndParseIcebergLong("COL", 1L, 0)); + assertEquals(1L, validateAndParseIcebergLong("COL", 1.499f, 0)); + assertEquals(0, validateAndParseIcebergLong("COL", -.0f, 0)); + assertEquals(1L, validateAndParseIcebergLong("COL", 1.0d, 0)); + assertEquals(1L, validateAndParseIcebergLong("COL", "1", 0)); + assertEquals(1L, validateAndParseIcebergLong("COL", " 1 \t\n", 0)); + assertEquals(1L, validateAndParseIcebergLong("COL", "0.5", 0)); + assertEquals(1L, validateAndParseIcebergLong("COL", "1.0e0", 0)); + assertEquals(1L, validateAndParseIcebergLong("COL", "1.0e+0", 0)); + assertEquals(1L, validateAndParseIcebergLong("COL", "1.0e-0", 0)); + assertEquals(10L, validateAndParseIcebergLong("COL", "1.0e1", 0)); + assertEquals(10L, validateAndParseIcebergLong("COL", "1.0e+1", 0)); + assertEquals(0L, validateAndParseIcebergLong("COL", "1.0e-1", 0)); + assertEquals(1L, validateAndParseIcebergLong("COL", new BigDecimal("1.4"), 0)); + assertEquals(Long.MAX_VALUE, validateAndParseIcebergLong("COL", "9223372036854775807.499", 0)); + assertEquals(Long.MIN_VALUE, validateAndParseIcebergLong("COL", "-9223372036854775808.499", 0)); + + // Test forbidden values + expectError(ErrorCode.INVALID_FORMAT_ROW, () -> validateAndParseIcebergLong("COL", 'c', 0)); + expectError( + ErrorCode.INVALID_FORMAT_ROW, () -> validateAndParseIcebergLong("COL", new Object(), 0)); + expectError( + ErrorCode.INVALID_FORMAT_ROW, () -> validateAndParseIcebergLong("COL", new int[] {}, 0)); + expectError(ErrorCode.INVALID_VALUE_ROW, () -> validateAndParseIcebergLong("COL", "foo", 0)); + expectError( + ErrorCode.INVALID_VALUE_ROW, + () -> validateAndParseIcebergLong("COL", "9223372036854775807.5", 0)); + expectError( + ErrorCode.INVALID_VALUE_ROW, + () -> validateAndParseIcebergLong("COL", new BigDecimal("-9223372036854775808.5"), 0)); + expectError( + ErrorCode.INVALID_VALUE_ROW, () -> validateAndParseIcebergLong("COL", Float.NaN, 0)); + expectError( + ErrorCode.INVALID_VALUE_ROW, + () -> validateAndParseIcebergLong("COL", Float.POSITIVE_INFINITY, 0)); + expectError( + ErrorCode.INVALID_VALUE_ROW, + () -> validateAndParseIcebergLong("COL", Double.NEGATIVE_INFINITY, 0)); + } + /** * Tests that exception message are constructed correctly when ingesting forbidden Java type, as * well a value of an allowed type, but in invalid format diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/IcebergDataTypeParserTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/IcebergDataTypeParserTest.java new file mode 100644 index 000000000..e12d2f4f7 --- /dev/null +++ b/src/test/java/net/snowflake/ingest/streaming/internal/IcebergDataTypeParserTest.java @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal; + +import java.util.ArrayList; +import java.util.List; +import net.snowflake.ingest.utils.IcebergDataTypeParser; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** Test for Iceberg data type serialization and deserialization. */ +public class IcebergDataTypeParserTest { + private class DataTypeInfo { + + // Json representation of Iceberg data type + String jsonStr; + Type icebergType; + + DataTypeInfo(String jsonStr, Type icebergType) { + this.jsonStr = jsonStr; + this.icebergType = icebergType; + } + } + + private int fieldId = 0; + + List dataTypesToTest; + + @Before + public void setup() { + fieldId = 0; + + // Create a Iceberg data type information list with primitive types and nested types. + dataTypesToTest = new ArrayList<>(); + dataTypesToTest.add(new DataTypeInfo("\"boolean\"", Types.BooleanType.get())); + dataTypesToTest.add(new DataTypeInfo("\"int\"", Types.IntegerType.get())); + dataTypesToTest.add(new DataTypeInfo("\"long\"", Types.LongType.get())); + dataTypesToTest.add(new DataTypeInfo("\"float\"", Types.FloatType.get())); + dataTypesToTest.add(new DataTypeInfo("\"double\"", Types.DoubleType.get())); + dataTypesToTest.add(new DataTypeInfo("\"string\"", Types.StringType.get())); + dataTypesToTest.add(new DataTypeInfo("\"date\"", Types.DateType.get())); + dataTypesToTest.add(new DataTypeInfo("\"time\"", Types.TimeType.get())); + dataTypesToTest.add(new DataTypeInfo("\"timestamptz\"", Types.TimestampType.withZone())); + dataTypesToTest.add( + new DataTypeInfo( + "{\"type\":\"struct\",\"fields\":[{\"id\":1,\"name\":\"first\",\"required\":false,\"type\":\"int\"},{\"id\":2,\"name\":\"second\",\"required\":false,\"type\":\"int\"}]}", + Types.StructType.of( + Types.NestedField.optional(generateFieldId(), "first", Types.IntegerType.get()), + Types.NestedField.optional(generateFieldId(), "second", Types.IntegerType.get())))); + dataTypesToTest.add( + new DataTypeInfo( + "{\"type\":\"list\",\"element-id\":3,\"element\":\"int\",\"element-required\":false}", + Types.ListType.ofOptional(generateFieldId(), Types.IntegerType.get()))); + dataTypesToTest.add( + new DataTypeInfo( + "{\"type\":\"map\",\"key-id\":4,\"key\":\"int\",\"value-id\":5,\"value\":\"string\",\"value-required\":false}", + Types.MapType.ofOptional( + generateFieldId(), + generateFieldId(), + Types.IntegerType.get(), + Types.StringType.get()))); + } + + /** Helper function to generate a unique fieldId for nested types */ + private int generateFieldId() { + fieldId++; + return fieldId; + } + + /** Test for Iceberg data type deserialization. */ + @Test + public void testDeserializeIcebergType() { + for (int i = 0; i < dataTypesToTest.size(); i++) { + DataTypeInfo typeInfo = dataTypesToTest.get(i); + Type dataType = IcebergDataTypeParser.deserializeIcebergType(typeInfo.jsonStr); + Assert.assertEquals(typeInfo.icebergType, dataType); + } + } + + @Test + public void testDeserializeIcebergTypeFailed() { + String json = "bad json"; + IllegalArgumentException exception = + Assert.assertThrows( + IllegalArgumentException.class, + () -> IcebergDataTypeParser.deserializeIcebergType(json)); + Assert.assertEquals( + "Failed to deserialize Iceberg data type: bad json", exception.getMessage()); + } + + @Test + public void testUnsupportedIcebergType() { + String json = "{\"type\":\"unsupported\"}"; + IllegalArgumentException exception = + Assert.assertThrows( + IllegalArgumentException.class, + () -> IcebergDataTypeParser.deserializeIcebergType(json)); + Assert.assertEquals( + "Cannot parse Iceberg type: unsupported, schema: {\"type\":\"unsupported\"}", + exception.getMessage()); + } +} diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParserTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParserTest.java new file mode 100644 index 000000000..a0b4caa1c --- /dev/null +++ b/src/test/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParserTest.java @@ -0,0 +1,325 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal; + +import static java.time.ZoneOffset.UTC; +import static net.snowflake.ingest.streaming.internal.ParquetBufferValue.BYTE_ARRAY_LENGTH_ENCODING_BYTE_LEN; +import static net.snowflake.ingest.streaming.internal.ParquetBufferValue.DEFINITION_LEVEL_ENCODING_BYTE_LEN; + +import java.math.BigDecimal; +import java.math.BigInteger; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Type.Repetition; +import org.apache.parquet.schema.Types; +import org.junit.Test; + +public class IcebergParquetValueParserTest { + + @Test + public void parseValueBoolean() { + Type type = + Types.primitive(PrimitiveTypeName.BOOLEAN, Repetition.OPTIONAL).named("BOOLEAN_COL"); + + RowBufferStats rowBufferStats = new RowBufferStats("BOOLEAN_COL"); + ParquetBufferValue pv = + IcebergParquetValueParser.parseColumnValueToParquet(true, type, rowBufferStats, UTC, 0); + ParquetValueParserAssertionBuilder.newBuilder() + .parquetBufferValue(pv) + .rowBufferStats(rowBufferStats) + .expectedValueClass(Boolean.class) + .expectedParsedValue(true) + .expectedSize(ParquetBufferValue.BIT_ENCODING_BYTE_LEN + DEFINITION_LEVEL_ENCODING_BYTE_LEN) + .expectedMinMax(BigInteger.valueOf(1)) + .assertMatches(); + } + + @Test + public void parseValueInt() { + Type type = Types.primitive(PrimitiveTypeName.INT32, Repetition.OPTIONAL).named("INT_COL"); + + RowBufferStats rowBufferStats = new RowBufferStats("INT_COL"); + ParquetBufferValue pv = + IcebergParquetValueParser.parseColumnValueToParquet( + Integer.MAX_VALUE, type, rowBufferStats, UTC, 0); + ParquetValueParserAssertionBuilder.newBuilder() + .parquetBufferValue(pv) + .rowBufferStats(rowBufferStats) + .expectedValueClass(Integer.class) + .expectedParsedValue(Integer.MAX_VALUE) + .expectedSize(4.0f + DEFINITION_LEVEL_ENCODING_BYTE_LEN) + .expectedMinMax(BigInteger.valueOf(Integer.MAX_VALUE)) + .assertMatches(); + } + + @Test + public void parseValueDecimalToInt() { + Type type = + Types.primitive(PrimitiveTypeName.INT32, Repetition.OPTIONAL) + .as(LogicalTypeAnnotation.decimalType(4, 9)) + .named("DECIMAL_COL"); + + RowBufferStats rowBufferStats = new RowBufferStats("DECIMAL_COL"); + ParquetBufferValue pv = + IcebergParquetValueParser.parseColumnValueToParquet( + new BigDecimal("12345.6789"), type, rowBufferStats, UTC, 0); + ParquetValueParserAssertionBuilder.newBuilder() + .parquetBufferValue(pv) + .rowBufferStats(rowBufferStats) + .expectedValueClass(Integer.class) + .expectedParsedValue(123456789) + .expectedSize(4.0f + DEFINITION_LEVEL_ENCODING_BYTE_LEN) + .expectedMinMax(BigInteger.valueOf(123456789)) + .assertMatches(); + } + + @Test + public void parseValueDateToInt() { + Type type = + Types.primitive(PrimitiveTypeName.INT32, Repetition.OPTIONAL) + .as(LogicalTypeAnnotation.dateType()) + .named("DATE_COL"); + + RowBufferStats rowBufferStats = new RowBufferStats("DATE_COL"); + ParquetBufferValue pv = + IcebergParquetValueParser.parseColumnValueToParquet( + "2024-01-01", type, rowBufferStats, UTC, 0); + ParquetValueParserAssertionBuilder.newBuilder() + .parquetBufferValue(pv) + .rowBufferStats(rowBufferStats) + .expectedValueClass(Integer.class) + .expectedParsedValue(19723) + .expectedSize(4.0f + DEFINITION_LEVEL_ENCODING_BYTE_LEN) + .expectedMinMax(BigInteger.valueOf(19723)) + .assertMatches(); + } + + @Test + public void parseValueLong() { + Type type = Types.primitive(PrimitiveTypeName.INT64, Repetition.OPTIONAL).named("LONG_COL"); + + RowBufferStats rowBufferStats = new RowBufferStats("LONG_COL"); + ParquetBufferValue pv = + IcebergParquetValueParser.parseColumnValueToParquet( + Long.MAX_VALUE, type, rowBufferStats, UTC, 0); + ParquetValueParserAssertionBuilder.newBuilder() + .parquetBufferValue(pv) + .rowBufferStats(rowBufferStats) + .expectedValueClass(Long.class) + .expectedParsedValue(Long.MAX_VALUE) + .expectedSize(8.0f + DEFINITION_LEVEL_ENCODING_BYTE_LEN) + .expectedMinMax(BigInteger.valueOf(Long.MAX_VALUE)) + .assertMatches(); + } + + @Test + public void parseValueDecimalToLong() { + Type type = + Types.primitive(PrimitiveTypeName.INT64, Repetition.OPTIONAL) + .as(LogicalTypeAnnotation.decimalType(9, 18)) + .named("DECIMAL_COL"); + + RowBufferStats rowBufferStats = new RowBufferStats("DECIMAL_COL"); + ParquetBufferValue pv = + IcebergParquetValueParser.parseColumnValueToParquet( + new BigDecimal("123456789.123456789"), type, rowBufferStats, UTC, 0); + ParquetValueParserAssertionBuilder.newBuilder() + .parquetBufferValue(pv) + .rowBufferStats(rowBufferStats) + .expectedValueClass(Long.class) + .expectedParsedValue(123456789123456789L) + .expectedSize(8.0f + DEFINITION_LEVEL_ENCODING_BYTE_LEN) + .expectedMinMax(BigInteger.valueOf(123456789123456789L)) + .assertMatches(); + } + + @Test + public void parseValueTimeToLong() { + Type type = + Types.primitive(PrimitiveTypeName.INT64, Repetition.OPTIONAL) + .as(LogicalTypeAnnotation.timeType(false, LogicalTypeAnnotation.TimeUnit.MICROS)) + .named("TIME_COL"); + + RowBufferStats rowBufferStats = new RowBufferStats("TIME_COL"); + ParquetBufferValue pv = + IcebergParquetValueParser.parseColumnValueToParquet( + "12:34:56.789", type, rowBufferStats, UTC, 0); + ParquetValueParserAssertionBuilder.newBuilder() + .parquetBufferValue(pv) + .rowBufferStats(rowBufferStats) + .expectedValueClass(Long.class) + .expectedParsedValue(45296789000L) + .expectedSize(8.0f + DEFINITION_LEVEL_ENCODING_BYTE_LEN) + .expectedMinMax(BigInteger.valueOf(45296789000L)) + .assertMatches(); + } + + @Test + public void parseValueTimestampToLong() { + Type type = + Types.primitive(PrimitiveTypeName.INT64, Repetition.OPTIONAL) + .as(LogicalTypeAnnotation.timestampType(false, LogicalTypeAnnotation.TimeUnit.MICROS)) + .named("TIMESTAMP_COL"); + + RowBufferStats rowBufferStats = new RowBufferStats("TIMESTAMP_COL"); + ParquetBufferValue pv = + IcebergParquetValueParser.parseColumnValueToParquet( + "2024-01-01T12:34:56.789+08:00", type, rowBufferStats, UTC, 0); + ParquetValueParserAssertionBuilder.newBuilder() + .parquetBufferValue(pv) + .rowBufferStats(rowBufferStats) + .expectedValueClass(Long.class) + .expectedParsedValue(1704112496789000L) + .expectedSize(8.0f + DEFINITION_LEVEL_ENCODING_BYTE_LEN) + .expectedMinMax(BigInteger.valueOf(1704112496789000L)) + .assertMatches(); + } + + @Test + public void parseValueTimestampTZToLong() { + Type type = + Types.primitive(PrimitiveTypeName.INT64, Repetition.OPTIONAL) + .as(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MICROS)) + .named("TIMESTAMP_TZ_COL"); + + RowBufferStats rowBufferStats = new RowBufferStats("TIMESTAMP_TZ_COL"); + ParquetBufferValue pv = + IcebergParquetValueParser.parseColumnValueToParquet( + "2024-01-01T12:34:56.789+08:00", type, rowBufferStats, UTC, 0); + ParquetValueParserAssertionBuilder.newBuilder() + .parquetBufferValue(pv) + .rowBufferStats(rowBufferStats) + .expectedValueClass(Long.class) + .expectedParsedValue(1704083696789000L) + .expectedSize(8.0f + DEFINITION_LEVEL_ENCODING_BYTE_LEN) + .expectedMinMax(BigInteger.valueOf(1704083696789000L)) + .assertMatches(); + } + + @Test + public void parseValueFloat() { + Type type = Types.primitive(PrimitiveTypeName.FLOAT, Repetition.OPTIONAL).named("FLOAT_COL"); + + RowBufferStats rowBufferStats = new RowBufferStats("FLOAT_COL"); + ParquetBufferValue pv = + IcebergParquetValueParser.parseColumnValueToParquet( + Float.MAX_VALUE, type, rowBufferStats, UTC, 0); + ParquetValueParserAssertionBuilder.newBuilder() + .parquetBufferValue(pv) + .rowBufferStats(rowBufferStats) + .expectedValueClass(Float.class) + .expectedParsedValue(Float.MAX_VALUE) + .expectedSize(4.0f + DEFINITION_LEVEL_ENCODING_BYTE_LEN) + .expectedMinMax((double) Float.MAX_VALUE) + .assertMatches(); + } + + @Test + public void parseValueDouble() { + Type type = Types.primitive(PrimitiveTypeName.DOUBLE, Repetition.OPTIONAL).named("DOUBLE_COL"); + + RowBufferStats rowBufferStats = new RowBufferStats("DOUBLE_COL"); + ParquetBufferValue pv = + IcebergParquetValueParser.parseColumnValueToParquet( + Double.MAX_VALUE, type, rowBufferStats, UTC, 0); + ParquetValueParserAssertionBuilder.newBuilder() + .parquetBufferValue(pv) + .rowBufferStats(rowBufferStats) + .expectedValueClass(Double.class) + .expectedParsedValue(Double.MAX_VALUE) + .expectedSize(8.0f + DEFINITION_LEVEL_ENCODING_BYTE_LEN) + .expectedMinMax(Double.MAX_VALUE) + .assertMatches(); + } + + @Test + public void parseValueBinary() { + Type type = Types.primitive(PrimitiveTypeName.BINARY, Repetition.OPTIONAL).named("BINARY_COL"); + + RowBufferStats rowBufferStats = new RowBufferStats("BINARY_COL"); + byte[] value = "snowflake_to_the_moon".getBytes(); + ParquetBufferValue pv = + IcebergParquetValueParser.parseColumnValueToParquet(value, type, rowBufferStats, UTC, 0); + ParquetValueParserAssertionBuilder.newBuilder() + .parquetBufferValue(pv) + .rowBufferStats(rowBufferStats) + .expectedValueClass(byte[].class) + .expectedParsedValue(value) + .expectedSize( + value.length + DEFINITION_LEVEL_ENCODING_BYTE_LEN + BYTE_ARRAY_LENGTH_ENCODING_BYTE_LEN) + .expectedMinMax(value) + .assertMatches(); + } + + @Test + public void parseValueStringToBinary() { + Type type = + Types.primitive(PrimitiveTypeName.BINARY, Repetition.OPTIONAL) + .as(LogicalTypeAnnotation.stringType()) + .named("BINARY_COL"); + + RowBufferStats rowBufferStats = new RowBufferStats("BINARY_COL"); + String value = "snowflake_to_the_moon"; + ParquetBufferValue pv = + IcebergParquetValueParser.parseColumnValueToParquet(value, type, rowBufferStats, UTC, 0); + ParquetValueParserAssertionBuilder.newBuilder() + .parquetBufferValue(pv) + .rowBufferStats(rowBufferStats) + .expectedValueClass(byte[].class) + .expectedParsedValue(value.getBytes()) + .expectedSize( + value.getBytes().length + + DEFINITION_LEVEL_ENCODING_BYTE_LEN + + BYTE_ARRAY_LENGTH_ENCODING_BYTE_LEN) + .expectedMinMax(value.getBytes()) + .assertMatches(); + } + + @Test + public void parseValueFixed() { + Type type = + Types.primitive(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, Repetition.OPTIONAL) + .length(4) + .named("FIXED_COL"); + + RowBufferStats rowBufferStats = new RowBufferStats("FIXED_COL"); + byte[] value = "snow".getBytes(); + ParquetBufferValue pv = + IcebergParquetValueParser.parseColumnValueToParquet(value, type, rowBufferStats, UTC, 0); + ParquetValueParserAssertionBuilder.newBuilder() + .parquetBufferValue(pv) + .rowBufferStats(rowBufferStats) + .expectedValueClass(byte[].class) + .expectedParsedValue(value) + .expectedSize( + 4.0f + DEFINITION_LEVEL_ENCODING_BYTE_LEN + BYTE_ARRAY_LENGTH_ENCODING_BYTE_LEN) + .expectedMinMax(value) + .assertMatches(); + } + + @Test + public void parseValueDecimalToFixed() { + Type type = + Types.primitive(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, Repetition.OPTIONAL) + .length(9) + .as(LogicalTypeAnnotation.decimalType(10, 20)) + .named("FIXED_COL"); + + RowBufferStats rowBufferStats = new RowBufferStats("FIXED_COL"); + BigDecimal value = new BigDecimal("1234567890.0123456789"); + ParquetBufferValue pv = + IcebergParquetValueParser.parseColumnValueToParquet(value, type, rowBufferStats, UTC, 0); + ParquetValueParserAssertionBuilder.newBuilder() + .parquetBufferValue(pv) + .rowBufferStats(rowBufferStats) + .expectedValueClass(byte[].class) + .expectedParsedValue(value.unscaledValue().toByteArray()) + .expectedSize( + 9.0f + DEFINITION_LEVEL_ENCODING_BYTE_LEN + BYTE_ARRAY_LENGTH_ENCODING_BYTE_LEN) + .expectedMinMax(value.unscaledValue()) + .assertMatches(); + } +} diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ParquetTypeGeneratorTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ParquetTypeGeneratorTest.java index 3356ab277..c83d339c1 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ParquetTypeGeneratorTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ParquetTypeGeneratorTest.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal; import java.util.Map; @@ -19,8 +23,7 @@ public void buildFieldFixedSB1() { .nullable(true) .build(); - ParquetTypeGenerator.ParquetTypeInfo typeInfo = - ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); createParquetTypeInfoAssertionBuilder() .typeInfo(typeInfo) .expectedFieldName("TESTCOL") @@ -45,8 +48,7 @@ public void buildFieldFixedSB2() { .nullable(false) .build(); - ParquetTypeGenerator.ParquetTypeInfo typeInfo = - ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); createParquetTypeInfoAssertionBuilder() .typeInfo(typeInfo) .expectedFieldName("TESTCOL") @@ -71,8 +73,7 @@ public void buildFieldFixedSB4() { .nullable(true) .build(); - ParquetTypeGenerator.ParquetTypeInfo typeInfo = - ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); createParquetTypeInfoAssertionBuilder() .typeInfo(typeInfo) .expectedFieldName("TESTCOL") @@ -97,8 +98,7 @@ public void buildFieldFixedSB8() { .nullable(true) .build(); - ParquetTypeGenerator.ParquetTypeInfo typeInfo = - ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); createParquetTypeInfoAssertionBuilder() .typeInfo(typeInfo) .expectedFieldName("TESTCOL") @@ -123,8 +123,7 @@ public void buildFieldFixedSB16() { .nullable(true) .build(); - ParquetTypeGenerator.ParquetTypeInfo typeInfo = - ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); createParquetTypeInfoAssertionBuilder() .typeInfo(typeInfo) .expectedFieldName("TESTCOL") @@ -149,8 +148,7 @@ public void buildFieldLobVariant() { .nullable(true) .build(); - ParquetTypeGenerator.ParquetTypeInfo typeInfo = - ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); createParquetTypeInfoAssertionBuilder() .typeInfo(typeInfo) .expectedFieldName("TESTCOL") @@ -176,8 +174,7 @@ public void buildFieldTimestampNtzSB8() { .nullable(true) .build(); - ParquetTypeGenerator.ParquetTypeInfo typeInfo = - ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); createParquetTypeInfoAssertionBuilder() .typeInfo(typeInfo) .expectedFieldName("TESTCOL") @@ -201,8 +198,7 @@ public void buildFieldTimestampNtzSB16() { .nullable(true) .build(); - ParquetTypeGenerator.ParquetTypeInfo typeInfo = - ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); createParquetTypeInfoAssertionBuilder() .typeInfo(typeInfo) .expectedFieldName("TESTCOL") @@ -226,8 +222,7 @@ public void buildFieldTimestampTzSB8() { .nullable(true) .build(); - ParquetTypeGenerator.ParquetTypeInfo typeInfo = - ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); createParquetTypeInfoAssertionBuilder() .typeInfo(typeInfo) .expectedFieldName("TESTCOL") @@ -251,8 +246,7 @@ public void buildFieldTimestampTzSB16() { .nullable(true) .build(); - ParquetTypeGenerator.ParquetTypeInfo typeInfo = - ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); createParquetTypeInfoAssertionBuilder() .typeInfo(typeInfo) .expectedFieldName("TESTCOL") @@ -276,8 +270,7 @@ public void buildFieldDate() { .nullable(true) .build(); - ParquetTypeGenerator.ParquetTypeInfo typeInfo = - ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); createParquetTypeInfoAssertionBuilder() .typeInfo(typeInfo) .expectedFieldName("TESTCOL") @@ -301,8 +294,7 @@ public void buildFieldTimeSB4() { .nullable(true) .build(); - ParquetTypeGenerator.ParquetTypeInfo typeInfo = - ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); createParquetTypeInfoAssertionBuilder() .typeInfo(typeInfo) .expectedFieldName("TESTCOL") @@ -326,8 +318,7 @@ public void buildFieldTimeSB8() { .nullable(true) .build(); - ParquetTypeGenerator.ParquetTypeInfo typeInfo = - ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); createParquetTypeInfoAssertionBuilder() .typeInfo(typeInfo) .expectedFieldName("TESTCOL") @@ -351,8 +342,7 @@ public void buildFieldBoolean() { .nullable(true) .build(); - ParquetTypeGenerator.ParquetTypeInfo typeInfo = - ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); createParquetTypeInfoAssertionBuilder() .typeInfo(typeInfo) .expectedFieldName("TESTCOL") @@ -376,8 +366,7 @@ public void buildFieldRealSB16() { .nullable(true) .build(); - ParquetTypeGenerator.ParquetTypeInfo typeInfo = - ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); createParquetTypeInfoAssertionBuilder() .typeInfo(typeInfo) .expectedFieldName("TESTCOL") @@ -392,23 +381,336 @@ public void buildFieldRealSB16() { .assertMatches(); } + @Test + public void buildFieldIcebergBoolean() { + ColumnMetadata testCol = + createColumnMetadataBuilder() + .logicalType("") + .sourceIcebergDataType("\"boolean\"") + .nullable(true) + .build(); + + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + createParquetTypeInfoAssertionBuilder() + .typeInfo(typeInfo) + .expectedFieldName("TESTCOL") + .expectedTypeLength(0) + .expectedPrimitiveTypeName(PrimitiveType.PrimitiveTypeName.BOOLEAN) + .expectedLogicalTypeAnnotation(null) + .expectedRepetition(Type.Repetition.OPTIONAL) + .expectedColMetaData(null) + .assertMatches(); + } + + @Test + public void buildFieldIcebergInt() { + ColumnMetadata testCol = + createColumnMetadataBuilder() + .logicalType("") + .sourceIcebergDataType("\"int\"") + .nullable(true) + .build(); + + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + createParquetTypeInfoAssertionBuilder() + .typeInfo(typeInfo) + .expectedFieldName("TESTCOL") + .expectedTypeLength(0) + .expectedPrimitiveTypeName(PrimitiveType.PrimitiveTypeName.INT32) + .expectedLogicalTypeAnnotation(null) + .expectedRepetition(Type.Repetition.OPTIONAL) + .expectedColMetaData(null) + .assertMatches(); + } + + @Test + public void buildFieldIcebergLong() { + ColumnMetadata testCol = + createColumnMetadataBuilder() + .logicalType("") + .sourceIcebergDataType("\"long\"") + .nullable(true) + .build(); + + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + createParquetTypeInfoAssertionBuilder() + .typeInfo(typeInfo) + .expectedFieldName("TESTCOL") + .expectedTypeLength(0) + .expectedPrimitiveTypeName(PrimitiveType.PrimitiveTypeName.INT64) + .expectedLogicalTypeAnnotation(null) + .expectedRepetition(Type.Repetition.OPTIONAL) + .expectedColMetaData(null) + .assertMatches(); + } + + @Test + public void buildFieldIcebergFloat() { + ColumnMetadata testCol = + createColumnMetadataBuilder() + .logicalType("") + .sourceIcebergDataType("\"float\"") + .nullable(true) + .build(); + + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + createParquetTypeInfoAssertionBuilder() + .typeInfo(typeInfo) + .expectedFieldName("TESTCOL") + .expectedTypeLength(0) + .expectedPrimitiveTypeName(PrimitiveType.PrimitiveTypeName.FLOAT) + .expectedLogicalTypeAnnotation(null) + .expectedRepetition(Type.Repetition.OPTIONAL) + .expectedColMetaData(null) + .assertMatches(); + } + + @Test + public void buildFieldIcebergDouble() { + ColumnMetadata testCol = + createColumnMetadataBuilder() + .logicalType("") + .sourceIcebergDataType("\"double\"") + .nullable(true) + .build(); + + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + createParquetTypeInfoAssertionBuilder() + .typeInfo(typeInfo) + .expectedFieldName("TESTCOL") + .expectedTypeLength(0) + .expectedPrimitiveTypeName(PrimitiveType.PrimitiveTypeName.DOUBLE) + .expectedLogicalTypeAnnotation(null) + .expectedRepetition(Type.Repetition.OPTIONAL) + .expectedColMetaData(null) + .assertMatches(); + } + + @Test + public void buildFieldIcebergDecimal() { + ColumnMetadata testCol = + createColumnMetadataBuilder() + .logicalType("") + .sourceIcebergDataType("\"decimal(9, 2)\"") + .nullable(true) + .build(); + + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + createParquetTypeInfoAssertionBuilder() + .typeInfo(typeInfo) + .expectedFieldName("TESTCOL") + .expectedTypeLength(0) + .expectedPrimitiveTypeName(PrimitiveType.PrimitiveTypeName.INT32) + .expectedLogicalTypeAnnotation(LogicalTypeAnnotation.decimalType(2, 9)) + .expectedRepetition(Type.Repetition.OPTIONAL) + .expectedColMetaData(null) + .assertMatches(); + + testCol = + createColumnMetadataBuilder() + .logicalType("") + .sourceIcebergDataType("\"decimal(10, 4)\"") + .nullable(true) + .build(); + + typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + createParquetTypeInfoAssertionBuilder() + .typeInfo(typeInfo) + .expectedFieldName("TESTCOL") + .expectedTypeLength(0) + .expectedPrimitiveTypeName(PrimitiveType.PrimitiveTypeName.INT64) + .expectedLogicalTypeAnnotation(LogicalTypeAnnotation.decimalType(4, 10)) + .expectedRepetition(Type.Repetition.OPTIONAL) + .expectedColMetaData(null) + .assertMatches(); + + testCol = + createColumnMetadataBuilder() + .logicalType("") + .sourceIcebergDataType("\"decimal(19, 1)\"") + .nullable(true) + .build(); + + typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + createParquetTypeInfoAssertionBuilder() + .typeInfo(typeInfo) + .expectedFieldName("TESTCOL") + .expectedTypeLength(9) + .expectedPrimitiveTypeName(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) + .expectedLogicalTypeAnnotation(LogicalTypeAnnotation.decimalType(1, 19)) + .expectedRepetition(Type.Repetition.OPTIONAL) + .expectedColMetaData(null) + .assertMatches(); + } + + @Test + public void buildFieldIcebergDate() { + ColumnMetadata testCol = + createColumnMetadataBuilder() + .logicalType("") + .sourceIcebergDataType("\"date\"") + .nullable(true) + .build(); + + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + createParquetTypeInfoAssertionBuilder() + .typeInfo(typeInfo) + .expectedFieldName("TESTCOL") + .expectedTypeLength(0) + .expectedPrimitiveTypeName(PrimitiveType.PrimitiveTypeName.INT32) + .expectedLogicalTypeAnnotation(LogicalTypeAnnotation.dateType()) + .expectedRepetition(Type.Repetition.OPTIONAL) + .expectedColMetaData(null) + .assertMatches(); + } + + @Test + public void buildFieldIcebergTime() { + ColumnMetadata testCol = + createColumnMetadataBuilder() + .logicalType("") + .sourceIcebergDataType("\"time\"") + .nullable(true) + .build(); + + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + createParquetTypeInfoAssertionBuilder() + .typeInfo(typeInfo) + .expectedFieldName("TESTCOL") + .expectedTypeLength(0) + .expectedPrimitiveTypeName(PrimitiveType.PrimitiveTypeName.INT64) + .expectedLogicalTypeAnnotation( + LogicalTypeAnnotation.timeType(false, LogicalTypeAnnotation.TimeUnit.MICROS)) + .expectedRepetition(Type.Repetition.OPTIONAL) + .expectedColMetaData(null) + .assertMatches(); + } + + @Test + public void buildFieldIcebergTimeStamp() { + ColumnMetadata testCol = + createColumnMetadataBuilder() + .logicalType("") + .sourceIcebergDataType("\"timestamp\"") + .nullable(true) + .build(); + + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + createParquetTypeInfoAssertionBuilder() + .typeInfo(typeInfo) + .expectedFieldName("TESTCOL") + .expectedTypeLength(0) + .expectedPrimitiveTypeName(PrimitiveType.PrimitiveTypeName.INT64) + .expectedLogicalTypeAnnotation( + LogicalTypeAnnotation.timestampType(false, LogicalTypeAnnotation.TimeUnit.MICROS)) + .expectedRepetition(Type.Repetition.OPTIONAL) + .expectedColMetaData(null) + .assertMatches(); + } + + @Test + public void buildFieldIcebergTimeStampTZ() { + ColumnMetadata testCol = + createColumnMetadataBuilder() + .logicalType("") + .sourceIcebergDataType("\"timestamptz\"") + .nullable(true) + .build(); + + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + createParquetTypeInfoAssertionBuilder() + .typeInfo(typeInfo) + .expectedFieldName("TESTCOL") + .expectedTypeLength(0) + .expectedPrimitiveTypeName(PrimitiveType.PrimitiveTypeName.INT64) + .expectedLogicalTypeAnnotation( + LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MICROS)) + .expectedRepetition(Type.Repetition.OPTIONAL) + .expectedColMetaData(null) + .assertMatches(); + } + + @Test + public void buildFieldIcebergString() { + ColumnMetadata testCol = + createColumnMetadataBuilder() + .logicalType("") + .sourceIcebergDataType("\"string\"") + .nullable(true) + .build(); + + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + createParquetTypeInfoAssertionBuilder() + .typeInfo(typeInfo) + .expectedFieldName("TESTCOL") + .expectedTypeLength(0) + .expectedPrimitiveTypeName(PrimitiveType.PrimitiveTypeName.BINARY) + .expectedLogicalTypeAnnotation(LogicalTypeAnnotation.stringType()) + .expectedRepetition(Type.Repetition.OPTIONAL) + .expectedColMetaData(null) + .assertMatches(); + } + + @Test + public void buildFieldIcebergFixed() { + ColumnMetadata testCol = + createColumnMetadataBuilder() + .logicalType("") + .sourceIcebergDataType("\"fixed[16]\"") + .nullable(true) + .build(); + + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + createParquetTypeInfoAssertionBuilder() + .typeInfo(typeInfo) + .expectedFieldName("TESTCOL") + .expectedTypeLength(16) + .expectedPrimitiveTypeName(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) + .expectedLogicalTypeAnnotation(null) + .expectedRepetition(Type.Repetition.OPTIONAL) + .expectedColMetaData(null) + .assertMatches(); + } + + @Test + public void buildFieldIcebergBinary() { + ColumnMetadata testCol = + createColumnMetadataBuilder() + .logicalType("") + .sourceIcebergDataType("\"binary\"") + .nullable(true) + .build(); + + ParquetTypeInfo typeInfo = ParquetTypeGenerator.generateColumnParquetTypeInfo(testCol, 0); + createParquetTypeInfoAssertionBuilder() + .typeInfo(typeInfo) + .expectedFieldName("TESTCOL") + .expectedTypeLength(0) + .expectedPrimitiveTypeName(PrimitiveType.PrimitiveTypeName.BINARY) + .expectedLogicalTypeAnnotation(null) + .expectedRepetition(Type.Repetition.OPTIONAL) + .expectedColMetaData(null) + .assertMatches(); + } + /** Builder that helps to assert parquet type info */ private static class ParquetTypeInfoAssertionBuilder { private String fieldName; - private int fieldId; + private Integer fieldId; private PrimitiveType.PrimitiveTypeName primitiveTypeName; private LogicalTypeAnnotation logicalTypeAnnotation; private Type.Repetition repetition; - private int typeLength; + private Integer typeLength; private String colMetadata; - private ParquetTypeGenerator.ParquetTypeInfo typeInfo; + private ParquetTypeInfo typeInfo; + private Integer fieldCount; static ParquetTypeInfoAssertionBuilder newBuilder() { ParquetTypeInfoAssertionBuilder builder = new ParquetTypeInfoAssertionBuilder(); return builder; } - ParquetTypeInfoAssertionBuilder typeInfo(ParquetTypeGenerator.ParquetTypeInfo typeInfo) { + ParquetTypeInfoAssertionBuilder typeInfo(ParquetTypeInfo typeInfo) { this.typeInfo = typeInfo; return this; } @@ -450,17 +752,32 @@ ParquetTypeInfoAssertionBuilder expectedColMetaData(String colMetaData) { return this; } + ParquetTypeInfoAssertionBuilder expectedFieldCount(int fieldCount) { + this.fieldCount = fieldCount; + return this; + } + void assertMatches() { Type type = typeInfo.getParquetType(); Map metadata = typeInfo.getMetadata(); Assert.assertEquals(fieldName, type.getName()); - Assert.assertEquals(typeLength, type.asPrimitiveType().getTypeLength()); - Assert.assertEquals(fieldId, type.asPrimitiveType().getId().intValue()); - - Assert.assertEquals(primitiveTypeName, type.asPrimitiveType().getPrimitiveTypeName()); + if (typeLength != null) { + Assert.assertEquals(typeLength.intValue(), type.asPrimitiveType().getTypeLength()); + } + if (fieldId != null) { + Assert.assertEquals(fieldId.intValue(), type.asPrimitiveType().getId().intValue()); + } + if (primitiveTypeName != null) { + Assert.assertEquals(primitiveTypeName, type.asPrimitiveType().getPrimitiveTypeName()); + } Assert.assertEquals(logicalTypeAnnotation, type.getLogicalTypeAnnotation()); Assert.assertEquals(repetition, type.getRepetition()); - Assert.assertEquals(colMetadata, metadata.get(type.getId().toString())); + if (metadata != null) { + Assert.assertEquals(colMetadata, metadata.get(type.getId().toString())); + } + if (fieldCount != null) { + Assert.assertEquals(fieldCount.intValue(), type.asGroupType().getFieldCount()); + } } } @@ -469,6 +786,13 @@ private static ColumnMetadataBuilder createColumnMetadataBuilder() { } private static ParquetTypeInfoAssertionBuilder createParquetTypeInfoAssertionBuilder() { - return ParquetTypeInfoAssertionBuilder.newBuilder().expectedFieldId(COL_ORDINAL); + return createParquetTypeInfoAssertionBuilder(true); + } + + private static ParquetTypeInfoAssertionBuilder createParquetTypeInfoAssertionBuilder( + boolean expectedFieldId) { + return expectedFieldId + ? ParquetTypeInfoAssertionBuilder.newBuilder().expectedFieldId(COL_ORDINAL) + : ParquetTypeInfoAssertionBuilder.newBuilder(); } } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ParquetValueParserAssertionBuilder.java b/src/test/java/net/snowflake/ingest/streaming/internal/ParquetValueParserAssertionBuilder.java new file mode 100644 index 000000000..8480311fa --- /dev/null +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ParquetValueParserAssertionBuilder.java @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.charset.StandardCharsets; +import org.junit.Assert; + +/** Builder that helps to assert parsing of values to parquet types */ +class ParquetValueParserAssertionBuilder { + private ParquetBufferValue parquetBufferValue; + private RowBufferStats rowBufferStats; + private Class valueClass; + private Object value; + private float size; + private Object minMaxStat; + private long currentNullCount; + + static ParquetValueParserAssertionBuilder newBuilder() { + ParquetValueParserAssertionBuilder builder = new ParquetValueParserAssertionBuilder(); + return builder; + } + + ParquetValueParserAssertionBuilder parquetBufferValue(ParquetBufferValue parquetBufferValue) { + this.parquetBufferValue = parquetBufferValue; + return this; + } + + ParquetValueParserAssertionBuilder rowBufferStats(RowBufferStats rowBufferStats) { + this.rowBufferStats = rowBufferStats; + return this; + } + + ParquetValueParserAssertionBuilder expectedValueClass(Class valueClass) { + this.valueClass = valueClass; + return this; + } + + ParquetValueParserAssertionBuilder expectedParsedValue(Object value) { + this.value = value; + return this; + } + + ParquetValueParserAssertionBuilder expectedSize(float size) { + this.size = size; + return this; + } + + public ParquetValueParserAssertionBuilder expectedMinMax(Object minMaxStat) { + this.minMaxStat = minMaxStat; + return this; + } + + public ParquetValueParserAssertionBuilder expectedNullCount(long currentNullCount) { + this.currentNullCount = currentNullCount; + return this; + } + + void assertMatches() { + Assert.assertEquals(valueClass, parquetBufferValue.getValue().getClass()); + if (valueClass.equals(byte[].class)) { + Assert.assertArrayEquals((byte[]) value, (byte[]) parquetBufferValue.getValue()); + } else { + Assert.assertEquals(value, parquetBufferValue.getValue()); + } + Assert.assertEquals(size, parquetBufferValue.getSize(), 0); + if (minMaxStat instanceof BigInteger) { + Assert.assertEquals(minMaxStat, rowBufferStats.getCurrentMinIntValue()); + Assert.assertEquals(minMaxStat, rowBufferStats.getCurrentMaxIntValue()); + return; + } else if (minMaxStat instanceof byte[]) { + Assert.assertArrayEquals((byte[]) minMaxStat, rowBufferStats.getCurrentMinStrValue()); + Assert.assertArrayEquals((byte[]) minMaxStat, rowBufferStats.getCurrentMaxStrValue()); + return; + } else if (valueClass.equals(String.class)) { + // String can have null min/max stats for variant data types + Object min = + rowBufferStats.getCurrentMinStrValue() != null + ? new String(rowBufferStats.getCurrentMinStrValue(), StandardCharsets.UTF_8) + : rowBufferStats.getCurrentMinStrValue(); + Object max = + rowBufferStats.getCurrentMaxStrValue() != null + ? new String(rowBufferStats.getCurrentMaxStrValue(), StandardCharsets.UTF_8) + : rowBufferStats.getCurrentMaxStrValue(); + Assert.assertEquals(minMaxStat, min); + Assert.assertEquals(minMaxStat, max); + return; + } else if (minMaxStat instanceof Double || minMaxStat instanceof BigDecimal) { + Assert.assertEquals(minMaxStat, rowBufferStats.getCurrentMinRealValue()); + Assert.assertEquals(minMaxStat, rowBufferStats.getCurrentMaxRealValue()); + return; + } + throw new IllegalArgumentException( + String.format("Unknown data type for min stat: %s", minMaxStat.getClass())); + } + + void assertNull() { + Assert.assertNull(parquetBufferValue.getValue()); + Assert.assertEquals(currentNullCount, rowBufferStats.getCurrentNullCount()); + } +} diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java index a25bdf7c1..70b950fda 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java @@ -19,6 +19,7 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import net.snowflake.ingest.connection.RequestBuilder; import net.snowflake.ingest.streaming.InsertValidationResponse; import net.snowflake.ingest.streaming.OpenChannelRequest; @@ -26,12 +27,23 @@ import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.SFException; import org.apache.commons.codec.binary.Hex; +import org.apache.commons.lang3.StringUtils; import org.apache.parquet.hadoop.BdecParquetReader; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +@RunWith(Parameterized.class) public class RowBufferTest { + @Parameterized.Parameters(name = "isIcebergMode: {0}") + public static Object[] isIcebergMode() { + return new Object[] {false, true}; + } + + @Parameterized.Parameter public static boolean isIcebergMode; + private AbstractRowBuffer rowBufferOnErrorContinue; private AbstractRowBuffer rowBufferOnErrorAbort; private AbstractRowBuffer rowBufferOnErrorSkipBatch; @@ -105,6 +117,16 @@ static List createSchema() { colChar.setLength(11); colChar.setScale(0); + if (isIcebergMode) { + colTinyIntCase.setSourceIcebergDataType("\"decimal(2,0)\""); + colTinyInt.setSourceIcebergDataType("\"decimal(1,0)\""); + colSmallInt.setSourceIcebergDataType("\"decimal(2,0)\""); + colInt.setSourceIcebergDataType("\"int\""); + colBigInt.setSourceIcebergDataType("\"long\""); + colDecimal.setSourceIcebergDataType("\"decimal(38,2)\""); + colChar.setSourceIcebergDataType("\"string\""); + } + List columns = Arrays.asList( colTinyIntCase, colTinyInt, colSmallInt, colInt, colBigInt, colDecimal, colChar); @@ -127,7 +149,8 @@ private AbstractRowBuffer createTestBuffer(OpenChannelRequest.OnErrorOption o MAX_CHUNK_SIZE_IN_BYTES_DEFAULT, MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT, Constants.BdecParquetCompression.GZIP, - ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT), + ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT, + isIcebergMode), null, null); } @@ -273,9 +296,12 @@ public void testInvalidPhysicalType() { @Test public void testStringLength() { - testStringLengthHelper(this.rowBufferOnErrorContinue); - testStringLengthHelper(this.rowBufferOnErrorAbort); - testStringLengthHelper(this.rowBufferOnErrorSkipBatch); + /* Iceberg cannot specify max length of string */ + if (!isIcebergMode) { + testStringLengthHelper(this.rowBufferOnErrorContinue); + testStringLengthHelper(this.rowBufferOnErrorAbort); + testStringLengthHelper(this.rowBufferOnErrorSkipBatch); + } } @Test @@ -293,7 +319,7 @@ public void testRowIndexWithMultipleRowsWithErrorHelper(AbstractRowBuffer row rows.add(row); row = new HashMap<>(); - row.put("colChar", "1111111111111111111111"); // too big + row.put("colChar", StringUtils.repeat('1', 16777217)); // too big rows.add(row); row = new HashMap<>(); @@ -301,7 +327,7 @@ public void testRowIndexWithMultipleRowsWithErrorHelper(AbstractRowBuffer row rows.add(row); row = new HashMap<>(); - row.put("colChar", "1111111111111111111111"); // too big + row.put("colChar", StringUtils.repeat('1', 16777217)); // too big rows.add(row); InsertValidationResponse response = rowBuffer.insertRows(rows, null, null); @@ -333,8 +359,8 @@ public void testRowIndexWithMultipleRowsWithErrorHelper(AbstractRowBuffer row .equalsIgnoreCase( "The given row cannot be converted to the internal format due to invalid value:" + " Value cannot be ingested into Snowflake column COLCHAR of type STRING," - + " rowIndex:1, reason: String too long: length=22 characters maxLength=11" - + " characters")); + + " rowIndex:1, reason: String too long: length=16777217 bytes" + + " maxLength=16777216 bytes")); Assert.assertTrue( response .getInsertErrors() @@ -344,8 +370,8 @@ public void testRowIndexWithMultipleRowsWithErrorHelper(AbstractRowBuffer row .equalsIgnoreCase( "The given row cannot be converted to the internal format due to invalid value:" + " Value cannot be ingested into Snowflake column COLCHAR of type STRING," - + " rowIndex:3, reason: String too long: length=22 characters maxLength=11" - + " characters")); + + " rowIndex:3, reason: String too long: length=16777217 bytes" + + " maxLength=16777216 bytes")); } private void testStringLengthHelper(AbstractRowBuffer rowBuffer) { @@ -813,6 +839,12 @@ private void testStatsE2ETimestampHelper(OpenChannelRequest.OnErrorOption onErro colTimestampLtzSB16Scale6.setLogicalType("TIMESTAMP_LTZ"); colTimestampLtzSB16Scale6.setScale(6); + if (isIcebergMode) { + colTimestampLtzSB8.setSourceIcebergDataType("\"timestamptz\""); + colTimestampLtzSB16.setSourceIcebergDataType("\"timestamptz\""); + colTimestampLtzSB16Scale6.setSourceIcebergDataType("\"timestamptz\""); + } + innerBuffer.setupSchema( Arrays.asList(colTimestampLtzSB8, colTimestampLtzSB16, colTimestampLtzSB16Scale6)); @@ -838,18 +870,23 @@ private void testStatsE2ETimestampHelper(OpenChannelRequest.OnErrorOption onErro Assert.assertEquals(3, result.getRowCount()); Assert.assertEquals( - BigInteger.valueOf(1621899220), + BigInteger.valueOf(1621899220 * (isIcebergMode ? 1000000L : 1)), result.getColumnEps().get("COLTIMESTAMPLTZ_SB8").getCurrentMinIntValue()); Assert.assertEquals( - BigInteger.valueOf(1621899221), + BigInteger.valueOf(1621899221 * (isIcebergMode ? 1000000L : 1)), result.getColumnEps().get("COLTIMESTAMPLTZ_SB8").getCurrentMaxIntValue()); - Assert.assertEquals( - new BigInteger("1621899220123456789"), - result.getColumnEps().get("COLTIMESTAMPLTZ_SB16").getCurrentMinIntValue()); - Assert.assertEquals( - new BigInteger("1621899220223456789"), - result.getColumnEps().get("COLTIMESTAMPLTZ_SB16").getCurrentMaxIntValue()); + /* Iceberg only supports microsecond precision for TIMESTAMP_LTZ */ + if (!isIcebergMode) { + Assert.assertEquals( + new BigInteger("1621899220123456789"), + result.getColumnEps().get("COLTIMESTAMPLTZ_SB16").getCurrentMinIntValue()); + Assert.assertEquals( + new BigInteger("1621899220223456789"), + result.getColumnEps().get("COLTIMESTAMPLTZ_SB16").getCurrentMaxIntValue()); + Assert.assertEquals( + 1, result.getColumnEps().get("COLTIMESTAMPLTZ_SB16").getCurrentNullCount()); + } Assert.assertEquals( new BigInteger("1621899220123456"), @@ -859,7 +896,6 @@ private void testStatsE2ETimestampHelper(OpenChannelRequest.OnErrorOption onErro result.getColumnEps().get("COLTIMESTAMPLTZ_SB16_SCALE6").getCurrentMaxIntValue()); Assert.assertEquals(1, result.getColumnEps().get("COLTIMESTAMPLTZ_SB8").getCurrentNullCount()); - Assert.assertEquals(1, result.getColumnEps().get("COLTIMESTAMPLTZ_SB16").getCurrentNullCount()); Assert.assertEquals( 1, result.getColumnEps().get("COLTIMESTAMPLTZ_SB16_SCALE6").getCurrentNullCount()); } @@ -940,6 +976,11 @@ private void testE2ETimeHelper(OpenChannelRequest.OnErrorOption onErrorOption) { colTimeSB8.setLogicalType("TIME"); colTimeSB8.setScale(3); + if (isIcebergMode) { + colTimeSB4.setSourceIcebergDataType("\"time\""); + colTimeSB8.setSourceIcebergDataType("\"time\""); + } + innerBuffer.setupSchema(Arrays.asList(colTimeSB4, colTimeSB8)); Map row1 = new HashMap<>(); @@ -959,34 +1000,65 @@ private void testE2ETimeHelper(OpenChannelRequest.OnErrorOption onErrorOption) { Assert.assertFalse(response.hasErrors()); // Check data was inserted into the buffer correctly - Assert.assertEquals(10 * 60 * 60, innerBuffer.getVectorValueAt("COLTIMESB4", 0)); - Assert.assertEquals(11 * 60 * 60 + 15 * 60, innerBuffer.getVectorValueAt("COLTIMESB4", 1)); - Assert.assertNull(innerBuffer.getVectorValueAt("COLTIMESB4", 2)); + if (isIcebergMode) { + Assert.assertEquals(10 * 60 * 60 * 1000000L, innerBuffer.getVectorValueAt("COLTIMESB4", 0)); + Assert.assertEquals( + (11 * 60 * 60 + 15 * 60) * 1000000L, innerBuffer.getVectorValueAt("COLTIMESB4", 1)); + Assert.assertEquals( + (10 * 60 * 60 * 1000L + 123) * 1000L, innerBuffer.getVectorValueAt("COLTIMESB8", 0)); + Assert.assertEquals( + (11 * 60 * 60 * 1000L + 15 * 60 * 1000 + 456) * 1000L, + innerBuffer.getVectorValueAt("COLTIMESB8", 1)); + } else { + Assert.assertEquals(10 * 60 * 60, innerBuffer.getVectorValueAt("COLTIMESB4", 0)); + Assert.assertEquals(11 * 60 * 60 + 15 * 60, innerBuffer.getVectorValueAt("COLTIMESB4", 1)); + Assert.assertEquals( + 10 * 60 * 60 * 1000L + 123, innerBuffer.getVectorValueAt("COLTIMESB8", 0)); + Assert.assertEquals( + 11 * 60 * 60 * 1000L + 15 * 60 * 1000 + 456, + innerBuffer.getVectorValueAt("COLTIMESB8", 1)); + } - Assert.assertEquals(10 * 60 * 60 * 1000L + 123, innerBuffer.getVectorValueAt("COLTIMESB8", 0)); - Assert.assertEquals( - 11 * 60 * 60 * 1000L + 15 * 60 * 1000 + 456, innerBuffer.getVectorValueAt("COLTIMESB8", 1)); + Assert.assertNull(innerBuffer.getVectorValueAt("COLTIMESB4", 2)); Assert.assertNull(innerBuffer.getVectorValueAt("COLTIMESB8", 2)); // Check stats generation ChannelData result = innerBuffer.flush(); Assert.assertEquals(3, result.getRowCount()); - Assert.assertEquals( - BigInteger.valueOf(10 * 60 * 60), - result.getColumnEps().get("COLTIMESB4").getCurrentMinIntValue()); - Assert.assertEquals( - BigInteger.valueOf(11 * 60 * 60 + 15 * 60), - result.getColumnEps().get("COLTIMESB4").getCurrentMaxIntValue()); - Assert.assertEquals(1, result.getColumnEps().get("COLTIMESB4").getCurrentNullCount()); + if (isIcebergMode) { + Assert.assertEquals( + BigInteger.valueOf(10 * 60 * 60 * 1000000L), + result.getColumnEps().get("COLTIMESB4").getCurrentMinIntValue()); + Assert.assertEquals( + BigInteger.valueOf((11 * 60 * 60 + 15 * 60) * 1000000L), + result.getColumnEps().get("COLTIMESB4").getCurrentMaxIntValue()); + Assert.assertEquals(1, result.getColumnEps().get("COLTIMESB4").getCurrentNullCount()); - Assert.assertEquals( - BigInteger.valueOf(10 * 60 * 60 * 1000L + 123), - result.getColumnEps().get("COLTIMESB8").getCurrentMinIntValue()); - Assert.assertEquals( - BigInteger.valueOf(11 * 60 * 60 * 1000L + 15 * 60 * 1000 + 456), - result.getColumnEps().get("COLTIMESB8").getCurrentMaxIntValue()); - Assert.assertEquals(1, result.getColumnEps().get("COLTIMESB8").getCurrentNullCount()); + Assert.assertEquals( + BigInteger.valueOf((10 * 60 * 60 * 1000L + 123) * 1000L), + result.getColumnEps().get("COLTIMESB8").getCurrentMinIntValue()); + Assert.assertEquals( + BigInteger.valueOf((11 * 60 * 60 * 1000L + 15 * 60 * 1000 + 456) * 1000L), + result.getColumnEps().get("COLTIMESB8").getCurrentMaxIntValue()); + Assert.assertEquals(1, result.getColumnEps().get("COLTIMESB8").getCurrentNullCount()); + } else { + Assert.assertEquals( + BigInteger.valueOf(10 * 60 * 60), + result.getColumnEps().get("COLTIMESB4").getCurrentMinIntValue()); + Assert.assertEquals( + BigInteger.valueOf(11 * 60 * 60 + 15 * 60), + result.getColumnEps().get("COLTIMESB4").getCurrentMaxIntValue()); + Assert.assertEquals(1, result.getColumnEps().get("COLTIMESB4").getCurrentNullCount()); + + Assert.assertEquals( + BigInteger.valueOf(10 * 60 * 60 * 1000L + 123), + result.getColumnEps().get("COLTIMESB8").getCurrentMinIntValue()); + Assert.assertEquals( + BigInteger.valueOf(11 * 60 * 60 * 1000L + 15 * 60 * 1000 + 456), + result.getColumnEps().get("COLTIMESB8").getCurrentMaxIntValue()); + Assert.assertEquals(1, result.getColumnEps().get("COLTIMESB8").getCurrentNullCount()); + } } @Test @@ -1005,6 +1077,7 @@ private void testMaxInsertRowsBatchSizeHelper(OpenChannelRequest.OnErrorOption o colBinary.setLogicalType("BINARY"); colBinary.setLength(8 * 1024 * 1024); colBinary.setByteLength(8 * 1024 * 1024); + colBinary.setSourceIcebergDataType("\"binary\""); byte[] arr = new byte[8 * 1024 * 1024]; innerBuffer.setupSchema(Collections.singletonList(colBinary)); @@ -1289,6 +1362,9 @@ private void testE2EBinaryHelper(OpenChannelRequest.OnErrorOption onErrorOption) colBinary.setLength(32); colBinary.setByteLength(256); colBinary.setScale(0); + if (isIcebergMode) { + colBinary.setSourceIcebergDataType("\"binary\""); + } innerBuffer.setupSchema(Collections.singletonList(colBinary)); @@ -1531,9 +1607,11 @@ public void testOnErrorAbortSkipBatch() { @Test public void testE2EVariant() { - testE2EVariantHelper(OpenChannelRequest.OnErrorOption.ABORT); - testE2EVariantHelper(OpenChannelRequest.OnErrorOption.CONTINUE); - testE2EVariantHelper(OpenChannelRequest.OnErrorOption.SKIP_BATCH); + if (!isIcebergMode) { + testE2EVariantHelper(OpenChannelRequest.OnErrorOption.ABORT); + testE2EVariantHelper(OpenChannelRequest.OnErrorOption.CONTINUE); + testE2EVariantHelper(OpenChannelRequest.OnErrorOption.SKIP_BATCH); + } } private void testE2EVariantHelper(OpenChannelRequest.OnErrorOption onErrorOption) { @@ -1582,9 +1660,11 @@ private void testE2EVariantHelper(OpenChannelRequest.OnErrorOption onErrorOption @Test public void testE2EObject() { - testE2EObjectHelper(OpenChannelRequest.OnErrorOption.ABORT); - testE2EObjectHelper(OpenChannelRequest.OnErrorOption.CONTINUE); - testE2EObjectHelper(OpenChannelRequest.OnErrorOption.SKIP_BATCH); + if (!isIcebergMode) { + testE2EObjectHelper(OpenChannelRequest.OnErrorOption.ABORT); + testE2EObjectHelper(OpenChannelRequest.OnErrorOption.CONTINUE); + testE2EObjectHelper(OpenChannelRequest.OnErrorOption.SKIP_BATCH); + } } private void testE2EObjectHelper(OpenChannelRequest.OnErrorOption onErrorOption) { @@ -1615,9 +1695,11 @@ private void testE2EObjectHelper(OpenChannelRequest.OnErrorOption onErrorOption) @Test public void testE2EArray() { - testE2EArrayHelper(OpenChannelRequest.OnErrorOption.ABORT); - testE2EArrayHelper(OpenChannelRequest.OnErrorOption.CONTINUE); - testE2EArrayHelper(OpenChannelRequest.OnErrorOption.SKIP_BATCH); + if (!isIcebergMode) { + testE2EArrayHelper(OpenChannelRequest.OnErrorOption.ABORT); + testE2EArrayHelper(OpenChannelRequest.OnErrorOption.CONTINUE); + testE2EArrayHelper(OpenChannelRequest.OnErrorOption.SKIP_BATCH); + } } private void testE2EArrayHelper(OpenChannelRequest.OnErrorOption onErrorOption) { @@ -1697,7 +1779,8 @@ public void testOnErrorAbortRowsWithError() { // insert one valid and one invalid row List> mixedRows = new ArrayList<>(); mixedRows.add(Collections.singletonMap("colChar", "b")); - mixedRows.add(Collections.singletonMap("colChar", "1111111111111111111111")); // too big + mixedRows.add( + Collections.singletonMap("colChar", StringUtils.repeat('1', 16777217))); // too big response = innerBufferOnErrorContinue.insertRows(mixedRows, "1", "3"); Assert.assertTrue(response.hasErrors()); @@ -1707,6 +1790,23 @@ public void testOnErrorAbortRowsWithError() { List> snapshotContinueParquet = ((ParquetChunkData) innerBufferOnErrorContinue.getSnapshot().get()).rows; + if (isIcebergMode) { + // Convert every object to string for iceberg mode + snapshotContinueParquet = + snapshotContinueParquet.stream() + .map( + row -> + row.stream() + .map( + obj -> { + if (obj instanceof byte[]) { + return new String((byte[]) obj, StandardCharsets.UTF_8); + } + return obj; + }) + .collect(Collectors.toList())) + .collect(Collectors.toList()); + } // validRows and only the good row from mixedRows are in the buffer Assert.assertEquals(2, snapshotContinueParquet.size()); Assert.assertEquals(Arrays.asList("a"), snapshotContinueParquet.get(0)); @@ -1714,6 +1814,23 @@ public void testOnErrorAbortRowsWithError() { List> snapshotAbortParquet = ((ParquetChunkData) innerBufferOnErrorAbort.getSnapshot().get()).rows; + if (isIcebergMode) { + // Convert every object to string for iceberg mode + snapshotAbortParquet = + snapshotAbortParquet.stream() + .map( + row -> + row.stream() + .map( + obj -> { + if (obj instanceof byte[]) { + return new String((byte[]) obj, StandardCharsets.UTF_8); + } + return obj; + }) + .collect(Collectors.toList())) + .collect(Collectors.toList()); + } // only validRows and none of the mixedRows are in the buffer Assert.assertEquals(1, snapshotAbortParquet.size()); Assert.assertEquals(Arrays.asList("a"), snapshotAbortParquet.get(0)); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ParquetValueParserTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeParquetValueParserTest.java similarity index 76% rename from src/test/java/net/snowflake/ingest/streaming/internal/ParquetValueParserTest.java rename to src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeParquetValueParserTest.java index e7c74e3b7..64db57c31 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ParquetValueParserTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeParquetValueParserTest.java @@ -1,9 +1,9 @@ package net.snowflake.ingest.streaming.internal; import static java.time.ZoneOffset.UTC; -import static net.snowflake.ingest.streaming.internal.ParquetValueParser.BIT_ENCODING_BYTE_LEN; -import static net.snowflake.ingest.streaming.internal.ParquetValueParser.BYTE_ARRAY_LENGTH_ENCODING_BYTE_LEN; -import static net.snowflake.ingest.streaming.internal.ParquetValueParser.DEFINITION_LEVEL_ENCODING_BYTE_LEN; +import static net.snowflake.ingest.streaming.internal.ParquetBufferValue.BIT_ENCODING_BYTE_LEN; +import static net.snowflake.ingest.streaming.internal.ParquetBufferValue.BYTE_ARRAY_LENGTH_ENCODING_BYTE_LEN; +import static net.snowflake.ingest.streaming.internal.ParquetBufferValue.DEFINITION_LEVEL_ENCODING_BYTE_LEN; import java.math.BigDecimal; import java.math.BigInteger; @@ -16,7 +16,7 @@ import org.junit.Assert; import org.junit.Test; -public class ParquetValueParserTest { +public class SnowflakeParquetValueParserTest { @Test public void parseValueFixedSB1ToInt32() { @@ -30,8 +30,8 @@ public void parseValueFixedSB1ToInt32() { .build(); RowBufferStats rowBufferStats = new RowBufferStats("COL1"); - ParquetValueParser.ParquetBufferValue pv = - ParquetValueParser.parseColumnValueToParquet( + ParquetBufferValue pv = + SnowflakeParquetValueParser.parseColumnValueToParquet( 12, testCol, PrimitiveType.PrimitiveTypeName.INT32, @@ -62,8 +62,8 @@ public void parseValueFixedSB2ToInt32() { .build(); RowBufferStats rowBufferStats = new RowBufferStats("COL1"); - ParquetValueParser.ParquetBufferValue pv = - ParquetValueParser.parseColumnValueToParquet( + ParquetBufferValue pv = + SnowflakeParquetValueParser.parseColumnValueToParquet( 1234, testCol, PrimitiveType.PrimitiveTypeName.INT32, @@ -94,8 +94,8 @@ public void parseValueFixedSB4ToInt32() { .build(); RowBufferStats rowBufferStats = new RowBufferStats("COL1"); - ParquetValueParser.ParquetBufferValue pv = - ParquetValueParser.parseColumnValueToParquet( + ParquetBufferValue pv = + SnowflakeParquetValueParser.parseColumnValueToParquet( 123456789, testCol, PrimitiveType.PrimitiveTypeName.INT32, @@ -126,8 +126,8 @@ public void parseValueFixedSB8ToInt64() { .build(); RowBufferStats rowBufferStats = new RowBufferStats("COL1"); - ParquetValueParser.ParquetBufferValue pv = - ParquetValueParser.parseColumnValueToParquet( + ParquetBufferValue pv = + SnowflakeParquetValueParser.parseColumnValueToParquet( 123456789987654321L, testCol, PrimitiveType.PrimitiveTypeName.INT64, @@ -158,8 +158,8 @@ public void parseValueFixedSB16ToByteArray() { .build(); RowBufferStats rowBufferStats = new RowBufferStats("COL1"); - ParquetValueParser.ParquetBufferValue pv = - ParquetValueParser.parseColumnValueToParquet( + ParquetBufferValue pv = + SnowflakeParquetValueParser.parseColumnValueToParquet( new BigDecimal("91234567899876543219876543211234567891"), testCol, PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, @@ -173,7 +173,7 @@ public void parseValueFixedSB16ToByteArray() { .rowBufferStats(rowBufferStats) .expectedValueClass(byte[].class) .expectedParsedValue( - ParquetValueParser.getSb16Bytes( + SnowflakeParquetValueParser.getSb16Bytes( new BigInteger("91234567899876543219876543211234567891"))) .expectedSize(16.0f + DEFINITION_LEVEL_ENCODING_BYTE_LEN) .expectedMinMax(new BigInteger("91234567899876543219876543211234567891")) @@ -192,8 +192,8 @@ public void parseValueFixedDecimalToInt32() { .build(); RowBufferStats rowBufferStats = new RowBufferStats("COL1"); - ParquetValueParser.ParquetBufferValue pv = - ParquetValueParser.parseColumnValueToParquet( + ParquetBufferValue pv = + SnowflakeParquetValueParser.parseColumnValueToParquet( new BigDecimal("12345.54321"), testCol, PrimitiveType.PrimitiveTypeName.DOUBLE, @@ -222,8 +222,8 @@ public void parseValueDouble() { .build(); RowBufferStats rowBufferStats = new RowBufferStats("COL1"); - ParquetValueParser.ParquetBufferValue pv = - ParquetValueParser.parseColumnValueToParquet( + ParquetBufferValue pv = + SnowflakeParquetValueParser.parseColumnValueToParquet( 12345.54321d, testCol, PrimitiveType.PrimitiveTypeName.DOUBLE, @@ -252,8 +252,8 @@ public void parseValueBoolean() { .build(); RowBufferStats rowBufferStats = new RowBufferStats("COL1"); - ParquetValueParser.ParquetBufferValue pv = - ParquetValueParser.parseColumnValueToParquet( + ParquetBufferValue pv = + SnowflakeParquetValueParser.parseColumnValueToParquet( true, testCol, PrimitiveType.PrimitiveTypeName.BOOLEAN, @@ -282,8 +282,8 @@ public void parseValueBinary() { .build(); RowBufferStats rowBufferStats = new RowBufferStats("COL1"); - ParquetValueParser.ParquetBufferValue pv = - ParquetValueParser.parseColumnValueToParquet( + ParquetBufferValue pv = + SnowflakeParquetValueParser.parseColumnValueToParquet( "1234abcd".getBytes(), testCol, PrimitiveType.PrimitiveTypeName.BINARY, @@ -327,8 +327,8 @@ private void testJsonWithLogicalType(String logicalType, boolean enableNewJsonPa "{\"key1\":-879869596,\"key2\":\"value2\",\"key3\":null," + "\"key4\":{\"key41\":0.032437,\"key42\":\"value42\",\"key43\":null}}"; RowBufferStats rowBufferStats = new RowBufferStats("COL1"); - ParquetValueParser.ParquetBufferValue pv = - ParquetValueParser.parseColumnValueToParquet( + ParquetBufferValue pv = + SnowflakeParquetValueParser.parseColumnValueToParquet( var, testCol, PrimitiveType.PrimitiveTypeName.BINARY, @@ -377,8 +377,8 @@ private void testNullJsonWithLogicalType(String var, boolean enableNewJsonParsin .build(); RowBufferStats rowBufferStats = new RowBufferStats("COL1"); - ParquetValueParser.ParquetBufferValue pv = - ParquetValueParser.parseColumnValueToParquet( + ParquetBufferValue pv = + SnowflakeParquetValueParser.parseColumnValueToParquet( var, testCol, PrimitiveType.PrimitiveTypeName.BINARY, @@ -418,8 +418,8 @@ public void parseValueArrayToBinaryInternal(boolean enableNewJsonParsingLogic) { input.put("c", "3"); RowBufferStats rowBufferStats = new RowBufferStats("COL1"); - ParquetValueParser.ParquetBufferValue pv = - ParquetValueParser.parseColumnValueToParquet( + ParquetBufferValue pv = + SnowflakeParquetValueParser.parseColumnValueToParquet( input, testCol, PrimitiveType.PrimitiveTypeName.BINARY, @@ -456,8 +456,8 @@ public void parseValueTextToBinary() { String text = "This is a sample text! Length is bigger than 32 bytes :)"; RowBufferStats rowBufferStats = new RowBufferStats("COL1"); - ParquetValueParser.ParquetBufferValue pv = - ParquetValueParser.parseColumnValueToParquet( + ParquetBufferValue pv = + SnowflakeParquetValueParser.parseColumnValueToParquet( text, testCol, PrimitiveType.PrimitiveTypeName.BINARY, @@ -497,7 +497,7 @@ public void parseValueTimestampNtzSB4Error() { Assert.assertThrows( SFException.class, () -> - ParquetValueParser.parseColumnValueToParquet( + SnowflakeParquetValueParser.parseColumnValueToParquet( "2013-04-28 20:57:00", testCol, PrimitiveType.PrimitiveTypeName.INT32, @@ -521,8 +521,8 @@ public void parseValueTimestampNtzSB8ToINT64() { .build(); RowBufferStats rowBufferStats = new RowBufferStats("COL1"); - ParquetValueParser.ParquetBufferValue pv = - ParquetValueParser.parseColumnValueToParquet( + ParquetBufferValue pv = + SnowflakeParquetValueParser.parseColumnValueToParquet( "2013-04-28T20:57:01.000", testCol, PrimitiveType.PrimitiveTypeName.INT64, @@ -552,8 +552,8 @@ public void parseValueTimestampNtzSB16ToByteArray() { .build(); RowBufferStats rowBufferStats = new RowBufferStats("COL1"); - ParquetValueParser.ParquetBufferValue pv = - ParquetValueParser.parseColumnValueToParquet( + ParquetBufferValue pv = + SnowflakeParquetValueParser.parseColumnValueToParquet( "2022-09-18T22:05:07.123456789", testCol, PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, @@ -567,7 +567,7 @@ public void parseValueTimestampNtzSB16ToByteArray() { .rowBufferStats(rowBufferStats) .expectedValueClass(byte[].class) .expectedParsedValue( - ParquetValueParser.getSb16Bytes(BigInteger.valueOf(1663538707123456789L))) + SnowflakeParquetValueParser.getSb16Bytes(BigInteger.valueOf(1663538707123456789L))) .expectedSize(16.0f + DEFINITION_LEVEL_ENCODING_BYTE_LEN) .expectedMinMax(BigInteger.valueOf(1663538707123456789L)) .assertMatches(); @@ -584,8 +584,8 @@ public void parseValueDateToInt32() { .build(); RowBufferStats rowBufferStats = new RowBufferStats("COL1"); - ParquetValueParser.ParquetBufferValue pv = - ParquetValueParser.parseColumnValueToParquet( + ParquetBufferValue pv = + SnowflakeParquetValueParser.parseColumnValueToParquet( "2021-01-01", testCol, PrimitiveType.PrimitiveTypeName.INT32, @@ -615,8 +615,8 @@ public void parseValueTimeSB4ToInt32() { .build(); RowBufferStats rowBufferStats = new RowBufferStats("COL1"); - ParquetValueParser.ParquetBufferValue pv = - ParquetValueParser.parseColumnValueToParquet( + ParquetBufferValue pv = + SnowflakeParquetValueParser.parseColumnValueToParquet( "01:00:00", testCol, PrimitiveType.PrimitiveTypeName.INT32, @@ -646,8 +646,8 @@ public void parseValueTimeSB8ToInt64() { .build(); RowBufferStats rowBufferStats = new RowBufferStats("COL1"); - ParquetValueParser.ParquetBufferValue pv = - ParquetValueParser.parseColumnValueToParquet( + ParquetBufferValue pv = + SnowflakeParquetValueParser.parseColumnValueToParquet( "01:00:00.123", testCol, PrimitiveType.PrimitiveTypeName.INT64, @@ -681,7 +681,7 @@ public void parseValueTimeSB16Error() { Assert.assertThrows( SFException.class, () -> - ParquetValueParser.parseColumnValueToParquet( + SnowflakeParquetValueParser.parseColumnValueToParquet( "11:00:00.12345678", testCol, PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, @@ -692,99 +692,4 @@ public void parseValueTimeSB16Error() { Assert.assertEquals( "Unknown data type for logical: TIME, physical: SB16.", exception.getMessage()); } - - /** Builder that helps to assert parsing of values to parquet types */ - private static class ParquetValueParserAssertionBuilder { - private ParquetValueParser.ParquetBufferValue parquetBufferValue; - private RowBufferStats rowBufferStats; - private Class valueClass; - private Object value; - private float size; - private Object minMaxStat; - private long currentNullCount; - - static ParquetValueParserAssertionBuilder newBuilder() { - ParquetValueParserAssertionBuilder builder = new ParquetValueParserAssertionBuilder(); - return builder; - } - - ParquetValueParserAssertionBuilder parquetBufferValue( - ParquetValueParser.ParquetBufferValue parquetBufferValue) { - this.parquetBufferValue = parquetBufferValue; - return this; - } - - ParquetValueParserAssertionBuilder rowBufferStats(RowBufferStats rowBufferStats) { - this.rowBufferStats = rowBufferStats; - return this; - } - - ParquetValueParserAssertionBuilder expectedValueClass(Class valueClass) { - this.valueClass = valueClass; - return this; - } - - ParquetValueParserAssertionBuilder expectedParsedValue(Object value) { - this.value = value; - return this; - } - - ParquetValueParserAssertionBuilder expectedSize(float size) { - this.size = size; - return this; - } - - public ParquetValueParserAssertionBuilder expectedMinMax(Object minMaxStat) { - this.minMaxStat = minMaxStat; - return this; - } - - public ParquetValueParserAssertionBuilder expectedNullCount(long currentNullCount) { - this.currentNullCount = currentNullCount; - return this; - } - - void assertMatches() { - Assert.assertEquals(valueClass, parquetBufferValue.getValue().getClass()); - if (valueClass.equals(byte[].class)) { - Assert.assertArrayEquals((byte[]) value, (byte[]) parquetBufferValue.getValue()); - } else { - Assert.assertEquals(value, parquetBufferValue.getValue()); - } - Assert.assertEquals(size, parquetBufferValue.getSize(), 0); - if (minMaxStat instanceof BigInteger) { - Assert.assertEquals(minMaxStat, rowBufferStats.getCurrentMinIntValue()); - Assert.assertEquals(minMaxStat, rowBufferStats.getCurrentMaxIntValue()); - return; - } else if (minMaxStat instanceof byte[]) { - Assert.assertArrayEquals((byte[]) minMaxStat, rowBufferStats.getCurrentMinStrValue()); - Assert.assertArrayEquals((byte[]) minMaxStat, rowBufferStats.getCurrentMaxStrValue()); - return; - } else if (valueClass.equals(String.class)) { - // String can have null min/max stats for variant data types - Object min = - rowBufferStats.getCurrentMinStrValue() != null - ? new String(rowBufferStats.getCurrentMinStrValue(), StandardCharsets.UTF_8) - : rowBufferStats.getCurrentMinStrValue(); - Object max = - rowBufferStats.getCurrentMaxStrValue() != null - ? new String(rowBufferStats.getCurrentMaxStrValue(), StandardCharsets.UTF_8) - : rowBufferStats.getCurrentMaxStrValue(); - Assert.assertEquals(minMaxStat, min); - Assert.assertEquals(minMaxStat, max); - return; - } else if (minMaxStat instanceof Double || minMaxStat instanceof BigDecimal) { - Assert.assertEquals(minMaxStat, rowBufferStats.getCurrentMinRealValue()); - Assert.assertEquals(minMaxStat, rowBufferStats.getCurrentMaxRealValue()); - return; - } - throw new IllegalArgumentException( - String.format("Unknown data type for min stat: %s", minMaxStat.getClass())); - } - - void assertNull() { - Assert.assertNull(parquetBufferValue.getValue()); - Assert.assertEquals(currentNullCount, rowBufferStats.getCurrentNullCount()); - } - } } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeServiceClientTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeServiceClientTest.java index 0e113c0ab..4815a5469 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeServiceClientTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeServiceClientTest.java @@ -186,6 +186,7 @@ public void testOpenChannel() throws IngestResponseException, IOException { "test_table", "test_channel", Constants.WriteMode.CLOUD_STORAGE, + false, "test_offset_token"); OpenChannelResponse openChannelResponse = snowflakeServiceClient.openChannel(openChannelRequest); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java index 9495b133e..d576c64fd 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java @@ -632,6 +632,9 @@ public void testInsertTooLargeRow() { col.setLogicalType("BINARY"); col.setLength(8388608); col.setByteLength(8388608); + if (isIcebergMode) { + col.setSourceIcebergDataType("\"binary\""); + } return col; }) .collect(Collectors.toList());