Skip to content

Commit

Permalink
SNOW-1507007 Support schema for new table format (#814)
Browse files Browse the repository at this point in the history
We use FDN-specific logical and physical data types only today. In this PR we change to use iceberg’s data types so there is no loss of signal between the table schema on the server versus what data type conversions are done in the client.
  • Loading branch information
sfc-gh-alhuang authored Sep 19, 2024
1 parent a107b50 commit 7eb7ba5
Show file tree
Hide file tree
Showing 30 changed files with 2,240 additions and 510 deletions.
59 changes: 57 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
Expand Down Expand Up @@ -48,6 +51,7 @@
<fasterxml.version>2.17.2</fasterxml.version>
<guava.version>32.0.1-jre</guava.version>
<hadoop.version>3.3.6</hadoop.version>
<iceberg.version>1.3.1</iceberg.version>
<jacoco.skip.instrument>true</jacoco.skip.instrument>
<jacoco.version>0.8.5</jacoco.version>
<license.processing.dependencyJarsDir>${project.build.directory}/dependency-jars</license.processing.dependencyJarsDir>
Expand Down Expand Up @@ -262,6 +266,21 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-api</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-parquet</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-column</artifactId>
Expand Down Expand Up @@ -506,6 +525,34 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
<exclusions>
<exclusion>
<groupId>io.airlift</groupId>
<artifactId>aircompressor</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-parquet</artifactId>
<exclusions>
<exclusion>
<groupId>io.airlift</groupId>
<artifactId>aircompressor</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.parquet/parquet-hadoop -->
<dependency>
<groupId>org.apache.parquet</groupId>
Expand Down Expand Up @@ -901,9 +948,10 @@
|Apache-2.0
|Apache License, Version 2.0
|Apache 2.0
|Apache License V2.0</licenseMerge>
|Apache License V2.0
|Apache 2</licenseMerge>
<licenseMerge>BSD 2-Clause License
|The BSD License</licenseMerge>
|The BSD License |BSD</licenseMerge>
<licenseMerge>The MIT License|MIT License</licenseMerge>
<licenseMerge>3-Clause BSD License|BSD-3-Clause</licenseMerge>
</licenseMerges>
Expand Down Expand Up @@ -1146,6 +1194,10 @@
<pattern>io.airlift.compress</pattern>
<shadedPattern>${shadeBase}.io.airlift.compress</shadedPattern>
</relocation>
<relocation>
<pattern>org.roaringbitmap</pattern>
<shadedPattern>${shadeBase}.org.roaringbitmap</shadedPattern>
</relocation>
</relocations>
<filters>
<filter>
Expand All @@ -1164,6 +1216,9 @@
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>LICENSE</exclude>
<exclude>NOTICE</exclude>
<exclude>iceberg-build.properties</exclude>
<exclude>google/protobuf/**/*.proto</exclude>
</excludes>
</filter>
Expand Down
5 changes: 4 additions & 1 deletion scripts/process_licenses.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@
"org.bouncycastle:bcpkix-jdk18on": BOUNCY_CASTLE_LICENSE,
"org.bouncycastle:bcutil-jdk18on": BOUNCY_CASTLE_LICENSE,
"org.bouncycastle:bcprov-jdk18on": BOUNCY_CASTLE_LICENSE,
"com.thoughtworks.paranamer:paranamer": BSD_2_CLAUSE_LICENSE,
"org.roaringbitmap:RoaringBitmap": APACHE_LICENSE,
"org.roaringbitmap:shims": APACHE_LICENSE,
}


Expand Down Expand Up @@ -115,7 +118,7 @@ def main():
for zip_info in current_jar_as_zip.infolist():
if zip_info.is_dir():
continue
if zip_info.filename in ("META-INF/LICENSE.txt", "META-INF/LICENSE", "META-INF/LICENSE.md"):
if zip_info.filename in ("META-INF/LICENSE.txt", "META-INF/LICENSE", "META-INF/LICENSE.md", "LICENSE"):
license_found = True
dependency_with_license_count += 1
# Extract license to the target directory
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2023-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;
Expand All @@ -18,21 +18,26 @@ public class ClientBufferParameters {

private Constants.BdecParquetCompression bdecParquetCompression;

private boolean isIcebergMode;

/**
* Private constructor used for test methods
*
* @param maxChunkSizeInBytes maximum chunk size in bytes
* @param maxAllowedRowSizeInBytes maximum row size in bytes
* @param isIcebergMode
*/
private ClientBufferParameters(
long maxChunkSizeInBytes,
long maxAllowedRowSizeInBytes,
Constants.BdecParquetCompression bdecParquetCompression,
boolean enableNewJsonParsingLogic) {
boolean enableNewJsonParsingLogic,
boolean isIcebergMode) {
this.maxChunkSizeInBytes = maxChunkSizeInBytes;
this.maxAllowedRowSizeInBytes = maxAllowedRowSizeInBytes;
this.bdecParquetCompression = bdecParquetCompression;
this.enableNewJsonParsingLogic = enableNewJsonParsingLogic;
this.isIcebergMode = isIcebergMode;
}

/** @param clientInternal reference to the client object where the relevant parameters are set */
Expand All @@ -49,28 +54,34 @@ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInter
clientInternal != null
? clientInternal.getParameterProvider().getBdecParquetCompressionAlgorithm()
: ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT;

this.enableNewJsonParsingLogic =
clientInternal != null
? clientInternal.getParameterProvider().isEnableNewJsonParsingLogic()
: ParameterProvider.ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT;
this.isIcebergMode =
clientInternal != null
? clientInternal.isIcebergMode()
: ParameterProvider.IS_ICEBERG_MODE_DEFAULT;
}

/**
* @param maxChunkSizeInBytes maximum chunk size in bytes
* @param maxAllowedRowSizeInBytes maximum row size in bytes
* @param isIcebergMode
* @return ClientBufferParameters object
*/
public static ClientBufferParameters test_createClientBufferParameters(
long maxChunkSizeInBytes,
long maxAllowedRowSizeInBytes,
Constants.BdecParquetCompression bdecParquetCompression,
boolean enableNewJsonParsingLogic) {
boolean enableNewJsonParsingLogic,
boolean isIcebergMode) {
return new ClientBufferParameters(
maxChunkSizeInBytes,
maxAllowedRowSizeInBytes,
bdecParquetCompression,
enableNewJsonParsingLogic);
enableNewJsonParsingLogic,
isIcebergMode);
}

public long getMaxChunkSizeInBytes() {
Expand All @@ -88,4 +99,8 @@ public Constants.BdecParquetCompression getBdecParquetCompression() {
public boolean isEnableNewJsonParsingLogic() {
return enableNewJsonParsingLogic;
}

public boolean getIsIcebergMode() {
return isIcebergMode;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;
Expand All @@ -22,6 +22,13 @@ class ColumnMetadata {
private boolean nullable;
private String collation;

/**
* The Json serialization of Iceberg data type of the column, see <a
* href="https://iceberg.apache.org/spec/#appendix-c-json-serialization">JSON serialization</a>
* for more details.
*/
private String sourceIcebergDataType;

/**
* The column ordinal is an internal id of the column used by server scanner for the column
* identification.
Expand Down Expand Up @@ -128,6 +135,15 @@ public Integer getOrdinal() {
return ordinal;
}

@JsonProperty("source_iceberg_data_type")
void setSourceIcebergDataType(String sourceIcebergDataType) {
this.sourceIcebergDataType = sourceIcebergDataType;
}

public String getSourceIcebergDataType() {
return sourceIcebergDataType;
}

String getInternalName() {
return internalName;
}
Expand All @@ -144,6 +160,7 @@ public String toString() {
map.put("byte_length", this.byteLength);
map.put("length", this.length);
map.put("nullable", this.nullable);
map.put("source_iceberg_datatype", this.sourceIcebergDataType);
return map.toString();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;
Expand All @@ -19,6 +19,7 @@
import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.math.RoundingMode;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDate;
Expand Down Expand Up @@ -718,7 +719,13 @@ static BigDecimal validateAndParseBigDecimal(
|| input instanceof Long) {
return BigDecimal.valueOf(((Number) input).longValue());
} else if (input instanceof Float || input instanceof Double) {
return BigDecimal.valueOf(((Number) input).doubleValue());
try {
return BigDecimal.valueOf(((Number) input).doubleValue());
} catch (NumberFormatException e) {
/* NaN and infinity are not allowed */
throw valueFormatNotAllowedException(
columnName, "NUMBER", "Not a valid number", insertRowIndex);
}
} else if (input instanceof String) {
try {
final String stringInput = ((String) input).trim();
Expand Down Expand Up @@ -957,6 +964,66 @@ static double validateAndParseReal(String columnName, Object input, long insertR
columnName, input.getClass(), "REAL", new String[] {"Number", "String"}, insertRowIndex);
}

/**
* Validates and parses input Iceberg INT column. Allowed Java types:
*
* <ul>
* <li>Number
* <li>String
* </ul>
*
* @param columnName Column name, used in validation error messages
* @param input Object to validate and parse
* @param insertRowIndex Row index for error reporting
* @return Parsed integer
*/
static int validateAndParseIcebergInt(String columnName, Object input, long insertRowIndex) {
BigDecimal roundedValue =
validateAndParseBigDecimal(columnName, input, insertRowIndex)
.setScale(0, RoundingMode.HALF_UP);
try {
return roundedValue.intValueExact();
} catch (ArithmeticException e) {
/* overflow */
throw new SFException(
ErrorCode.INVALID_VALUE_ROW,
String.format(
"Number out of representable inclusive range of integers between %d and %d,"
+ " rowIndex:%d",
Integer.MIN_VALUE, Integer.MAX_VALUE, insertRowIndex));
}
}

/**
* Validates and parses input Iceberg LONG column. Allowed Java types:
*
* <ul>
* <li>Number
* <li>String
* </ul>
*
* @param columnName Column name, used in validation error messages
* @param input Object to validate and parse
* @param insertRowIndex Row index for error reporting
* @return Parsed long
*/
static long validateAndParseIcebergLong(String columnName, Object input, long insertRowIndex) {
BigDecimal roundedValue =
validateAndParseBigDecimal(columnName, input, insertRowIndex)
.setScale(0, RoundingMode.HALF_UP);
try {
return roundedValue.longValueExact();
} catch (ArithmeticException e) {
/* overflow */
throw new SFException(
ErrorCode.INVALID_VALUE_ROW,
String.format(
"Number out of representable inclusive range of integers between %d and %d,"
+ " rowIndex:%d",
Long.MIN_VALUE, Long.MAX_VALUE, insertRowIndex));
}
}

/**
* Validate and parse input to integer output, 1=true, 0=false. String values converted to boolean
* according to https://docs.snowflake.com/en/sql-reference/functions/to_boolean.html#usage-notes
Expand Down Expand Up @@ -1003,6 +1070,16 @@ static void checkValueInRange(
}
}

static void checkFixedLengthByteArray(byte[] bytes, int length, final long insertRowIndex) {
if (bytes.length != length) {
throw new SFException(
ErrorCode.INVALID_FORMAT_ROW,
String.format(
"Binary length mismatch: expected=%d, actual=%d, rowIndex:%d",
length, bytes.length, insertRowIndex));
}
}

static Set<String> allowedBooleanStringsLowerCased =
Sets.newHashSet("1", "0", "yes", "no", "y", "n", "t", "f", "true", "false", "on", "off");

Expand Down
Loading

0 comments on commit 7eb7ba5

Please sign in to comment.