Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1507007 Support schema for new table format #814

Merged
merged 6 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 57 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
Expand Down Expand Up @@ -48,6 +51,7 @@
<fasterxml.version>2.17.2</fasterxml.version>
<guava.version>32.0.1-jre</guava.version>
<hadoop.version>3.3.6</hadoop.version>
<iceberg.version>1.3.1</iceberg.version>
<jacoco.skip.instrument>true</jacoco.skip.instrument>
<jacoco.version>0.8.5</jacoco.version>
<license.processing.dependencyJarsDir>${project.build.directory}/dependency-jars</license.processing.dependencyJarsDir>
Expand Down Expand Up @@ -262,6 +266,21 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-api</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-parquet</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-column</artifactId>
Expand Down Expand Up @@ -506,6 +525,34 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
<exclusions>
<exclusion>
<groupId>io.airlift</groupId>
<artifactId>aircompressor</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-parquet</artifactId>
<exclusions>
<exclusion>
<groupId>io.airlift</groupId>
<artifactId>aircompressor</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.parquet/parquet-hadoop -->
<dependency>
<groupId>org.apache.parquet</groupId>
Expand Down Expand Up @@ -901,9 +948,10 @@
|Apache-2.0
|Apache License, Version 2.0
|Apache 2.0
|Apache License V2.0</licenseMerge>
|Apache License V2.0
|Apache 2</licenseMerge>
<licenseMerge>BSD 2-Clause License
|The BSD License</licenseMerge>
|The BSD License |BSD</licenseMerge>
<licenseMerge>The MIT License|MIT License</licenseMerge>
<licenseMerge>3-Clause BSD License|BSD-3-Clause</licenseMerge>
</licenseMerges>
Expand Down Expand Up @@ -1146,6 +1194,10 @@
<pattern>io.airlift.compress</pattern>
<shadedPattern>${shadeBase}.io.airlift.compress</shadedPattern>
</relocation>
<relocation>
<pattern>org.roaringbitmap</pattern>
<shadedPattern>${shadeBase}.org.roaringbitmap</shadedPattern>
</relocation>
</relocations>
<filters>
<filter>
Expand All @@ -1164,6 +1216,9 @@
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>LICENSE</exclude>
<exclude>NOTICE</exclude>
<exclude>iceberg-build.properties</exclude>
<exclude>google/protobuf/**/*.proto</exclude>
</excludes>
</filter>
Expand Down
5 changes: 4 additions & 1 deletion scripts/process_licenses.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@
"org.bouncycastle:bcpkix-jdk18on": BOUNCY_CASTLE_LICENSE,
"org.bouncycastle:bcutil-jdk18on": BOUNCY_CASTLE_LICENSE,
"org.bouncycastle:bcprov-jdk18on": BOUNCY_CASTLE_LICENSE,
"com.thoughtworks.paranamer:paranamer": BSD_2_CLAUSE_LICENSE,
"org.roaringbitmap:RoaringBitmap": APACHE_LICENSE,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't find this library being used anywhere, why is this change needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a transitive dependency from org.apache.iceberg:iceberg-core.

[INFO] +- org.apache.iceberg:iceberg-core:jar:1.3.1:compile
[INFO] |  +- org.apache.iceberg:iceberg-common:jar:1.3.1:runtime
[INFO] |  +- org.apache.avro:avro:jar:1.11.1:runtime
[INFO] |  +- org.apache.httpcomponents.client5:httpclient5:jar:5.2.1:runtime
[INFO] |  |  +- org.apache.httpcomponents.core5:httpcore5:jar:5.2:runtime
[INFO] |  |  \- org.apache.httpcomponents.core5:httpcore5-h2:jar:5.2:runtime
[INFO] |  \- org.roaringbitmap:RoaringBitmap:jar:0.9.44:runtime
[INFO] |     \- org.roaringbitmap:shims:jar:0.9.44:runtime

"org.roaringbitmap:shims": APACHE_LICENSE,
}


Expand Down Expand Up @@ -115,7 +118,7 @@ def main():
for zip_info in current_jar_as_zip.infolist():
if zip_info.is_dir():
continue
if zip_info.filename in ("META-INF/LICENSE.txt", "META-INF/LICENSE", "META-INF/LICENSE.md"):
if zip_info.filename in ("META-INF/LICENSE.txt", "META-INF/LICENSE", "META-INF/LICENSE.md", "LICENSE"):
license_found = True
dependency_with_license_count += 1
# Extract license to the target directory
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2023-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;
Expand All @@ -18,21 +18,26 @@ public class ClientBufferParameters {

private Constants.BdecParquetCompression bdecParquetCompression;

private boolean isIcebergMode;

/**
* Private constructor used for test methods
*
* @param maxChunkSizeInBytes maximum chunk size in bytes
* @param maxAllowedRowSizeInBytes maximum row size in bytes
* @param isIcebergMode
*/
private ClientBufferParameters(
long maxChunkSizeInBytes,
long maxAllowedRowSizeInBytes,
Constants.BdecParquetCompression bdecParquetCompression,
boolean enableNewJsonParsingLogic) {
boolean enableNewJsonParsingLogic,
boolean isIcebergMode) {
this.maxChunkSizeInBytes = maxChunkSizeInBytes;
this.maxAllowedRowSizeInBytes = maxAllowedRowSizeInBytes;
this.bdecParquetCompression = bdecParquetCompression;
this.enableNewJsonParsingLogic = enableNewJsonParsingLogic;
this.isIcebergMode = isIcebergMode;
}

/** @param clientInternal reference to the client object where the relevant parameters are set */
Expand All @@ -49,28 +54,34 @@ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInter
clientInternal != null
? clientInternal.getParameterProvider().getBdecParquetCompressionAlgorithm()
: ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT;

this.enableNewJsonParsingLogic =
clientInternal != null
? clientInternal.getParameterProvider().isEnableNewJsonParsingLogic()
: ParameterProvider.ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT;
this.isIcebergMode =
clientInternal != null
? clientInternal.isIcebergMode()
: ParameterProvider.IS_ICEBERG_MODE_DEFAULT;
}

/**
* @param maxChunkSizeInBytes maximum chunk size in bytes
* @param maxAllowedRowSizeInBytes maximum row size in bytes
* @param isIcebergMode
* @return ClientBufferParameters object
*/
public static ClientBufferParameters test_createClientBufferParameters(
long maxChunkSizeInBytes,
long maxAllowedRowSizeInBytes,
Constants.BdecParquetCompression bdecParquetCompression,
boolean enableNewJsonParsingLogic) {
boolean enableNewJsonParsingLogic,
boolean isIcebergMode) {
return new ClientBufferParameters(
maxChunkSizeInBytes,
maxAllowedRowSizeInBytes,
bdecParquetCompression,
enableNewJsonParsingLogic);
enableNewJsonParsingLogic,
isIcebergMode);
}

public long getMaxChunkSizeInBytes() {
Expand All @@ -88,4 +99,8 @@ public Constants.BdecParquetCompression getBdecParquetCompression() {
public boolean isEnableNewJsonParsingLogic() {
return enableNewJsonParsingLogic;
}

public boolean getIsIcebergMode() {
return isIcebergMode;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;
Expand All @@ -22,6 +22,13 @@ class ColumnMetadata {
private boolean nullable;
private String collation;

/**
* The Json serialization of Iceberg data type of the column, see <a
* href="https://iceberg.apache.org/spec/#appendix-c-json-serialization">JSON serialization</a>
* for more details.
*/
private String sourceIcebergDataType;

/**
* The column ordinal is an internal id of the column used by server scanner for the column
* identification.
Expand Down Expand Up @@ -128,6 +135,15 @@ public Integer getOrdinal() {
return ordinal;
}

@JsonProperty("source_iceberg_data_type")
sfc-gh-alhuang marked this conversation as resolved.
Show resolved Hide resolved
void setSourceIcebergDataType(String sourceIcebergDataType) {
this.sourceIcebergDataType = sourceIcebergDataType;
}

public String getSourceIcebergDataType() {
return sourceIcebergDataType;
}

String getInternalName() {
return internalName;
}
Expand All @@ -144,6 +160,7 @@ public String toString() {
map.put("byte_length", this.byteLength);
map.put("length", this.length);
map.put("nullable", this.nullable);
map.put("source_iceberg_datatype", this.sourceIcebergDataType);
return map.toString();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;
Expand All @@ -19,6 +19,7 @@
import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.math.RoundingMode;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDate;
Expand Down Expand Up @@ -718,7 +719,13 @@ static BigDecimal validateAndParseBigDecimal(
|| input instanceof Long) {
return BigDecimal.valueOf(((Number) input).longValue());
} else if (input instanceof Float || input instanceof Double) {
return BigDecimal.valueOf(((Number) input).doubleValue());
try {
return BigDecimal.valueOf(((Number) input).doubleValue());
} catch (NumberFormatException e) {
/* NaN and infinity are not allowed */
throw valueFormatNotAllowedException(
columnName, "NUMBER", "Not a valid number", insertRowIndex);
}
} else if (input instanceof String) {
try {
final String stringInput = ((String) input).trim();
Expand Down Expand Up @@ -957,6 +964,66 @@ static double validateAndParseReal(String columnName, Object input, long insertR
columnName, input.getClass(), "REAL", new String[] {"Number", "String"}, insertRowIndex);
}

/**
* Validates and parses input Iceberg INT column. Allowed Java types:
*
* <ul>
* <li>Number
* <li>String
* </ul>
*
* @param columnName Column name, used in validation error messages
* @param input Object to validate and parse
* @param insertRowIndex Row index for error reporting
* @return Parsed integer
*/
static int validateAndParseIcebergInt(String columnName, Object input, long insertRowIndex) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add unit tests for these functions, which focus on corner cases? Min and max values, Double/Float NaN+positive/negative infinity, big integers and big decimals outside of the allowed range, etc.?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added tests here.

BigDecimal roundedValue =
validateAndParseBigDecimal(columnName, input, insertRowIndex)
.setScale(0, RoundingMode.HALF_UP);
try {
return roundedValue.intValueExact();
} catch (ArithmeticException e) {
/* overflow */
throw new SFException(
ErrorCode.INVALID_VALUE_ROW,
String.format(
"Number out of representable inclusive range of integers between %d and %d,"
+ " rowIndex:%d",
Integer.MIN_VALUE, Integer.MAX_VALUE, insertRowIndex));
}
}

/**
* Validates and parses input Iceberg LONG column. Allowed Java types:
*
* <ul>
* <li>Number
* <li>String
* </ul>
*
* @param columnName Column name, used in validation error messages
* @param input Object to validate and parse
* @param insertRowIndex Row index for error reporting
* @return Parsed long
*/
static long validateAndParseIcebergLong(String columnName, Object input, long insertRowIndex) {
BigDecimal roundedValue =
validateAndParseBigDecimal(columnName, input, insertRowIndex)
.setScale(0, RoundingMode.HALF_UP);
try {
return roundedValue.longValueExact();
} catch (ArithmeticException e) {
/* overflow */
throw new SFException(
ErrorCode.INVALID_VALUE_ROW,
String.format(
"Number out of representable inclusive range of integers between %d and %d,"
+ " rowIndex:%d",
Long.MIN_VALUE, Long.MAX_VALUE, insertRowIndex));
}
}

/**
* Validate and parse input to integer output, 1=true, 0=false. String values converted to boolean
* according to https://docs.snowflake.com/en/sql-reference/functions/to_boolean.html#usage-notes
Expand Down Expand Up @@ -1003,6 +1070,16 @@ static void checkValueInRange(
}
}

static void checkFixedLengthByteArray(byte[] bytes, int length, final long insertRowIndex) {
if (bytes.length != length) {
throw new SFException(
ErrorCode.INVALID_FORMAT_ROW,
String.format(
"Binary length mismatch: expected=%d, actual=%d, rowIndex:%d",
length, bytes.length, insertRowIndex));
}
}

static Set<String> allowedBooleanStringsLowerCased =
Sets.newHashSet("1", "0", "yes", "no", "y", "n", "t", "f", "true", "false", "on", "off");

Expand Down
Loading
Loading