Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-alhuang committed Oct 21, 2024
1 parent 5916907 commit 7787aa5
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 207 deletions.
15 changes: 10 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,6 @@
<artifactId>commons-lang3</artifactId>
<version>${commonslang3.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
<version>${commonstext.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
Expand Down Expand Up @@ -383,6 +378,12 @@
<version>1.14.9</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
<version>${commonstext.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
Expand Down Expand Up @@ -543,6 +544,10 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-configuration2</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
Expand Down
207 changes: 9 additions & 198 deletions src/main/java/net/snowflake/ingest/utils/IcebergDataTypeParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,6 @@

package net.snowflake.ingest.utils;

import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -20,21 +12,18 @@
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nonnull;
import org.apache.iceberg.parquet.TypeToMessageType;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.JsonUtil;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation;

/**
* This class is used to Iceberg data type (include primitive types and nested types) serialization
* and deserialization.
*
* <p>This code is modified from
* GlobalServices/modules/data-lake/datalake-api/src/main/java/com/snowflake/metadata/iceberg
* /IcebergDataTypeParser.java and <a
* href="https://github.com/apache/iceberg/blob/main/parquet/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java">
* TypeToMessageType.java</a>
* /IcebergDataTypeParser.java
*/
public class IcebergDataTypeParser {
private static final String TYPE = "type";
Expand All @@ -55,26 +44,12 @@ public class IcebergDataTypeParser {
private static final String ELEMENT_REQUIRED = "element-required";
private static final String VALUE_REQUIRED = "value-required";

private static final LogicalTypeAnnotation STRING = LogicalTypeAnnotation.stringType();
private static final LogicalTypeAnnotation DATE = LogicalTypeAnnotation.dateType();
private static final LogicalTypeAnnotation TIME_MICROS =
LogicalTypeAnnotation.timeType(
false /* not adjusted to UTC */, LogicalTypeAnnotation.TimeUnit.MICROS);
private static final LogicalTypeAnnotation TIMESTAMP_MICROS =
LogicalTypeAnnotation.timestampType(
false /* not adjusted to UTC */, LogicalTypeAnnotation.TimeUnit.MICROS);
private static final LogicalTypeAnnotation TIMESTAMPTZ_MICROS =
LogicalTypeAnnotation.timestampType(
true /* adjusted to UTC */, LogicalTypeAnnotation.TimeUnit.MICROS);

private static final int DECIMAL_INT32_MAX_DIGITS = 9;
private static final int DECIMAL_INT64_MAX_DIGITS = 18;
private static final int DECIMAL_MAX_DIGITS = 38;
private static final int DECIMAL_MAX_BYTES = 16;

/** Object mapper for this class */
private static final ObjectMapper MAPPER = new ObjectMapper();

/** Util class that contains the mapping between Iceberg data type and Parquet data type */
private static final TypeToMessageType typeToMessageType = new TypeToMessageType();

/**
* Get Iceberg data type information by deserialization.
*
Expand All @@ -91,15 +66,15 @@ public static org.apache.parquet.schema.Type parseIcebergDataTypeStringToParquet
String name) {
Type icebergType = deserializeIcebergType(icebergDataType);
if (icebergType.isPrimitiveType()) {
return primitive(icebergType.asPrimitiveType(), repetition, id, name);
return typeToMessageType.primitive(icebergType.asPrimitiveType(), repetition, id, name);
} else {
switch (icebergType.typeId()) {
case LIST:
return list(icebergType.asListType(), repetition, id, name);
return typeToMessageType.list(icebergType.asListType(), repetition, id, name);
case MAP:
return map(icebergType.asMapType(), repetition, id, name);
return typeToMessageType.map(icebergType.asMapType(), repetition, id, name);
case STRUCT:
return struct(icebergType.asStructType(), repetition, id, name);
return typeToMessageType.struct(icebergType.asStructType(), repetition, id, name);
default:
throw new SFException(
ErrorCode.INTERNAL_ERROR,
Expand Down Expand Up @@ -233,168 +208,4 @@ public static Types.MapType mapFromJson(JsonNode json) {
return Types.MapType.ofOptional(keyId, valueId, keyType, valueType);
}
}

private static GroupType struct(
Types.StructType struct,
org.apache.parquet.schema.Type.Repetition repetition,
int id,
String name) {
org.apache.parquet.schema.Types.GroupBuilder<GroupType> builder =
org.apache.parquet.schema.Types.buildGroup(repetition);

for (Types.NestedField field : struct.fields()) {
builder.addField(field(field));
}

return builder.id(id).named(name);
}

private static org.apache.parquet.schema.Type field(Types.NestedField field) {
org.apache.parquet.schema.Type.Repetition repetition =
field.isOptional()
? org.apache.parquet.schema.Type.Repetition.OPTIONAL
: org.apache.parquet.schema.Type.Repetition.REQUIRED;
int id = field.fieldId();
String name = field.name();

if (field.type().isPrimitiveType()) {
return primitive(field.type().asPrimitiveType(), repetition, id, name);

} else {
Type.NestedType nested = field.type().asNestedType();
if (nested.isStructType()) {
return struct(nested.asStructType(), repetition, id, name);
} else if (nested.isMapType()) {
return map(nested.asMapType(), repetition, id, name);
} else if (nested.isListType()) {
return list(nested.asListType(), repetition, id, name);
}
throw new UnsupportedOperationException("Can't convert unknown type: " + nested);
}
}

private static GroupType list(
Types.ListType list,
org.apache.parquet.schema.Type.Repetition repetition,
int id,
String name) {
Types.NestedField elementField = list.fields().get(0);
return org.apache.parquet.schema.Types.list(repetition)
.element(field(elementField))
.id(id)
.named(name);
}

private static GroupType map(
Types.MapType map,
org.apache.parquet.schema.Type.Repetition repetition,
int id,
String name) {
Types.NestedField keyField = map.fields().get(0);
Types.NestedField valueField = map.fields().get(1);
return org.apache.parquet.schema.Types.map(repetition)
.key(field(keyField))
.value(field(valueField))
.id(id)
.named(name);
}

public static org.apache.parquet.schema.Type primitive(
Type.PrimitiveType primitive,
org.apache.parquet.schema.Type.Repetition repetition,
int id,
String name) {
switch (primitive.typeId()) {
case BOOLEAN:
return org.apache.parquet.schema.Types.primitive(BOOLEAN, repetition).id(id).named(name);
case INTEGER:
return org.apache.parquet.schema.Types.primitive(INT32, repetition).id(id).named(name);
case LONG:
return org.apache.parquet.schema.Types.primitive(INT64, repetition).id(id).named(name);
case FLOAT:
return org.apache.parquet.schema.Types.primitive(FLOAT, repetition).id(id).named(name);
case DOUBLE:
return org.apache.parquet.schema.Types.primitive(DOUBLE, repetition).id(id).named(name);
case DATE:
return org.apache.parquet.schema.Types.primitive(INT32, repetition)
.as(DATE)
.id(id)
.named(name);
case TIME:
return org.apache.parquet.schema.Types.primitive(INT64, repetition)
.as(TIME_MICROS)
.id(id)
.named(name);
case TIMESTAMP:
if (((Types.TimestampType) primitive).shouldAdjustToUTC()) {
return org.apache.parquet.schema.Types.primitive(INT64, repetition)
.as(TIMESTAMPTZ_MICROS)
.id(id)
.named(name);
} else {
return org.apache.parquet.schema.Types.primitive(INT64, repetition)
.as(TIMESTAMP_MICROS)
.id(id)
.named(name);
}
case STRING:
return org.apache.parquet.schema.Types.primitive(BINARY, repetition)
.as(STRING)
.id(id)
.named(name);
case BINARY:
return org.apache.parquet.schema.Types.primitive(BINARY, repetition).id(id).named(name);
case FIXED:
Types.FixedType fixed = (Types.FixedType) primitive;

return org.apache.parquet.schema.Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition)
.length(fixed.length())
.id(id)
.named(name);

case DECIMAL:
Types.DecimalType decimal = (Types.DecimalType) primitive;

if (decimal.precision() <= DECIMAL_INT32_MAX_DIGITS) {
/* Store as an int. */
return org.apache.parquet.schema.Types.primitive(INT32, repetition)
.as(decimalAnnotation(decimal.precision(), decimal.scale()))
.id(id)
.named(name);

} else if (decimal.precision() <= DECIMAL_INT64_MAX_DIGITS) {
/* Store as a long. */
return org.apache.parquet.schema.Types.primitive(INT64, repetition)
.as(decimalAnnotation(decimal.precision(), decimal.scale()))
.id(id)
.named(name);

} else {
/* Does not follow Iceberg spec which should be minimum number of bytes. Use fix(16) (SB16) instead. */
if (decimal.precision() > DECIMAL_MAX_DIGITS) {
throw new IllegalArgumentException(
"Unsupported decimal precision: " + decimal.precision());
}
return org.apache.parquet.schema.Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition)
.length(DECIMAL_MAX_BYTES)
.as(decimalAnnotation(decimal.precision(), decimal.scale()))
.id(id)
.named(name);
}

case UUID:
return org.apache.parquet.schema.Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition)
.length(16)
.as(LogicalTypeAnnotation.uuidType())
.id(id)
.named(name);

default:
throw new UnsupportedOperationException("Unsupported type for Parquet: " + primitive);
}
}

private static LogicalTypeAnnotation decimalAnnotation(int precision, int scale) {
return LogicalTypeAnnotation.decimalType(scale, precision);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.text.RandomStringGenerator;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Ignore("This test can be enabled after server side Iceberg EP support is released")
public class IcebergNumericTypesIT extends AbstractDataTypeTest {
Expand All @@ -34,11 +36,21 @@ public static Object[][] parameters() {
@Parameterized.Parameter(1)
public static Constants.IcebergSerializationPolicy icebergSerializationPolicy;

static final Random generator = new Random(0x5EED);
private static final Logger logger = LoggerFactory.getLogger(IcebergNumericTypesIT.class);
private static Random generator;
private static RandomStringGenerator randomStringGenerator;

@Before
public void before() throws Exception {
super.beforeIceberg(compressionAlgorithm, icebergSerializationPolicy);
long seed = System.currentTimeMillis();
logger.info("Random seed: {}", seed);
generator = new Random(123);
randomStringGenerator =
new RandomStringGenerator.Builder()
.usingRandom(generator::nextInt)
.withinRange('0', '9')
.build();
}

@Test
Expand Down Expand Up @@ -417,9 +429,10 @@ private static List<Object> randomBigDecimal(int count, int precision, int scale
}
list.add(
new BigDecimal(
RandomStringUtils.randomNumeric(intPart)
(generator.nextBoolean() ? "-" : "")
+ randomStringGenerator.generate(intPart)
+ "."
+ RandomStringUtils.randomNumeric(floatPart)));
+ randomStringGenerator.generate(floatPart)));
}
return list;
}
Expand Down

0 comments on commit 7787aa5

Please sign in to comment.