From 7787aa52e4f5c4446c0373f65969e42e44f14ccd Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Mon, 21 Oct 2024 11:50:28 -0700 Subject: [PATCH] address comments --- pom.xml | 15 +- .../ingest/utils/IcebergDataTypeParser.java | 207 +----------------- .../datatypes/IcebergNumericTypesIT.java | 21 +- 3 files changed, 36 insertions(+), 207 deletions(-) diff --git a/pom.xml b/pom.xml index f0023e815..006b9481e 100644 --- a/pom.xml +++ b/pom.xml @@ -154,11 +154,6 @@ commons-lang3 ${commonslang3.version} - - org.apache.commons - commons-text - ${commonstext.version} - org.apache.hadoop hadoop-common @@ -383,6 +378,12 @@ 1.14.9 test + + org.apache.commons + commons-text + ${commonstext.version} + test + org.apache.hadoop hadoop-mapreduce-client-core @@ -543,6 +544,10 @@ org.apache.commons commons-configuration2 + + org.apache.commons + commons-text + org.apache.hadoop hadoop-common diff --git a/src/main/java/net/snowflake/ingest/utils/IcebergDataTypeParser.java b/src/main/java/net/snowflake/ingest/utils/IcebergDataTypeParser.java index 3f1f5fa01..abb03cdef 100644 --- a/src/main/java/net/snowflake/ingest/utils/IcebergDataTypeParser.java +++ b/src/main/java/net/snowflake/ingest/utils/IcebergDataTypeParser.java @@ -4,14 +4,6 @@ package net.snowflake.ingest.utils; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; - import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -20,11 +12,10 @@ import java.util.Iterator; import java.util.List; import javax.annotation.Nonnull; +import org.apache.iceberg.parquet.TypeToMessageType; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.JsonUtil; -import org.apache.parquet.schema.GroupType; -import org.apache.parquet.schema.LogicalTypeAnnotation; /** * This class is used to Iceberg data type (include primitive types and nested types) serialization @@ -32,9 +23,7 @@ * *

This code is modified from * GlobalServices/modules/data-lake/datalake-api/src/main/java/com/snowflake/metadata/iceberg - * /IcebergDataTypeParser.java and - * TypeToMessageType.java + * /IcebergDataTypeParser.java */ public class IcebergDataTypeParser { private static final String TYPE = "type"; @@ -55,26 +44,12 @@ public class IcebergDataTypeParser { private static final String ELEMENT_REQUIRED = "element-required"; private static final String VALUE_REQUIRED = "value-required"; - private static final LogicalTypeAnnotation STRING = LogicalTypeAnnotation.stringType(); - private static final LogicalTypeAnnotation DATE = LogicalTypeAnnotation.dateType(); - private static final LogicalTypeAnnotation TIME_MICROS = - LogicalTypeAnnotation.timeType( - false /* not adjusted to UTC */, LogicalTypeAnnotation.TimeUnit.MICROS); - private static final LogicalTypeAnnotation TIMESTAMP_MICROS = - LogicalTypeAnnotation.timestampType( - false /* not adjusted to UTC */, LogicalTypeAnnotation.TimeUnit.MICROS); - private static final LogicalTypeAnnotation TIMESTAMPTZ_MICROS = - LogicalTypeAnnotation.timestampType( - true /* adjusted to UTC */, LogicalTypeAnnotation.TimeUnit.MICROS); - - private static final int DECIMAL_INT32_MAX_DIGITS = 9; - private static final int DECIMAL_INT64_MAX_DIGITS = 18; - private static final int DECIMAL_MAX_DIGITS = 38; - private static final int DECIMAL_MAX_BYTES = 16; - /** Object mapper for this class */ private static final ObjectMapper MAPPER = new ObjectMapper(); + /** Util class that contains the mapping between Iceberg data type and Parquet data type */ + private static final TypeToMessageType typeToMessageType = new TypeToMessageType(); + /** * Get Iceberg data type information by deserialization. * @@ -91,15 +66,15 @@ public static org.apache.parquet.schema.Type parseIcebergDataTypeStringToParquet String name) { Type icebergType = deserializeIcebergType(icebergDataType); if (icebergType.isPrimitiveType()) { - return primitive(icebergType.asPrimitiveType(), repetition, id, name); + return typeToMessageType.primitive(icebergType.asPrimitiveType(), repetition, id, name); } else { switch (icebergType.typeId()) { case LIST: - return list(icebergType.asListType(), repetition, id, name); + return typeToMessageType.list(icebergType.asListType(), repetition, id, name); case MAP: - return map(icebergType.asMapType(), repetition, id, name); + return typeToMessageType.map(icebergType.asMapType(), repetition, id, name); case STRUCT: - return struct(icebergType.asStructType(), repetition, id, name); + return typeToMessageType.struct(icebergType.asStructType(), repetition, id, name); default: throw new SFException( ErrorCode.INTERNAL_ERROR, @@ -233,168 +208,4 @@ public static Types.MapType mapFromJson(JsonNode json) { return Types.MapType.ofOptional(keyId, valueId, keyType, valueType); } } - - private static GroupType struct( - Types.StructType struct, - org.apache.parquet.schema.Type.Repetition repetition, - int id, - String name) { - org.apache.parquet.schema.Types.GroupBuilder builder = - org.apache.parquet.schema.Types.buildGroup(repetition); - - for (Types.NestedField field : struct.fields()) { - builder.addField(field(field)); - } - - return builder.id(id).named(name); - } - - private static org.apache.parquet.schema.Type field(Types.NestedField field) { - org.apache.parquet.schema.Type.Repetition repetition = - field.isOptional() - ? org.apache.parquet.schema.Type.Repetition.OPTIONAL - : org.apache.parquet.schema.Type.Repetition.REQUIRED; - int id = field.fieldId(); - String name = field.name(); - - if (field.type().isPrimitiveType()) { - return primitive(field.type().asPrimitiveType(), repetition, id, name); - - } else { - Type.NestedType nested = field.type().asNestedType(); - if (nested.isStructType()) { - return struct(nested.asStructType(), repetition, id, name); - } else if (nested.isMapType()) { - return map(nested.asMapType(), repetition, id, name); - } else if (nested.isListType()) { - return list(nested.asListType(), repetition, id, name); - } - throw new UnsupportedOperationException("Can't convert unknown type: " + nested); - } - } - - private static GroupType list( - Types.ListType list, - org.apache.parquet.schema.Type.Repetition repetition, - int id, - String name) { - Types.NestedField elementField = list.fields().get(0); - return org.apache.parquet.schema.Types.list(repetition) - .element(field(elementField)) - .id(id) - .named(name); - } - - private static GroupType map( - Types.MapType map, - org.apache.parquet.schema.Type.Repetition repetition, - int id, - String name) { - Types.NestedField keyField = map.fields().get(0); - Types.NestedField valueField = map.fields().get(1); - return org.apache.parquet.schema.Types.map(repetition) - .key(field(keyField)) - .value(field(valueField)) - .id(id) - .named(name); - } - - public static org.apache.parquet.schema.Type primitive( - Type.PrimitiveType primitive, - org.apache.parquet.schema.Type.Repetition repetition, - int id, - String name) { - switch (primitive.typeId()) { - case BOOLEAN: - return org.apache.parquet.schema.Types.primitive(BOOLEAN, repetition).id(id).named(name); - case INTEGER: - return org.apache.parquet.schema.Types.primitive(INT32, repetition).id(id).named(name); - case LONG: - return org.apache.parquet.schema.Types.primitive(INT64, repetition).id(id).named(name); - case FLOAT: - return org.apache.parquet.schema.Types.primitive(FLOAT, repetition).id(id).named(name); - case DOUBLE: - return org.apache.parquet.schema.Types.primitive(DOUBLE, repetition).id(id).named(name); - case DATE: - return org.apache.parquet.schema.Types.primitive(INT32, repetition) - .as(DATE) - .id(id) - .named(name); - case TIME: - return org.apache.parquet.schema.Types.primitive(INT64, repetition) - .as(TIME_MICROS) - .id(id) - .named(name); - case TIMESTAMP: - if (((Types.TimestampType) primitive).shouldAdjustToUTC()) { - return org.apache.parquet.schema.Types.primitive(INT64, repetition) - .as(TIMESTAMPTZ_MICROS) - .id(id) - .named(name); - } else { - return org.apache.parquet.schema.Types.primitive(INT64, repetition) - .as(TIMESTAMP_MICROS) - .id(id) - .named(name); - } - case STRING: - return org.apache.parquet.schema.Types.primitive(BINARY, repetition) - .as(STRING) - .id(id) - .named(name); - case BINARY: - return org.apache.parquet.schema.Types.primitive(BINARY, repetition).id(id).named(name); - case FIXED: - Types.FixedType fixed = (Types.FixedType) primitive; - - return org.apache.parquet.schema.Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition) - .length(fixed.length()) - .id(id) - .named(name); - - case DECIMAL: - Types.DecimalType decimal = (Types.DecimalType) primitive; - - if (decimal.precision() <= DECIMAL_INT32_MAX_DIGITS) { - /* Store as an int. */ - return org.apache.parquet.schema.Types.primitive(INT32, repetition) - .as(decimalAnnotation(decimal.precision(), decimal.scale())) - .id(id) - .named(name); - - } else if (decimal.precision() <= DECIMAL_INT64_MAX_DIGITS) { - /* Store as a long. */ - return org.apache.parquet.schema.Types.primitive(INT64, repetition) - .as(decimalAnnotation(decimal.precision(), decimal.scale())) - .id(id) - .named(name); - - } else { - /* Does not follow Iceberg spec which should be minimum number of bytes. Use fix(16) (SB16) instead. */ - if (decimal.precision() > DECIMAL_MAX_DIGITS) { - throw new IllegalArgumentException( - "Unsupported decimal precision: " + decimal.precision()); - } - return org.apache.parquet.schema.Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition) - .length(DECIMAL_MAX_BYTES) - .as(decimalAnnotation(decimal.precision(), decimal.scale())) - .id(id) - .named(name); - } - - case UUID: - return org.apache.parquet.schema.Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition) - .length(16) - .as(LogicalTypeAnnotation.uuidType()) - .id(id) - .named(name); - - default: - throw new UnsupportedOperationException("Unsupported type for Parquet: " + primitive); - } - } - - private static LogicalTypeAnnotation decimalAnnotation(int precision, int scale) { - return LogicalTypeAnnotation.decimalType(scale, precision); - } } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergNumericTypesIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergNumericTypesIT.java index 81f4558e0..73191724c 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergNumericTypesIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergNumericTypesIT.java @@ -12,12 +12,14 @@ import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.SFException; -import org.apache.commons.lang3.RandomStringUtils; +import org.apache.commons.text.RandomStringGenerator; import org.assertj.core.api.Assertions; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @Ignore("This test can be enabled after server side Iceberg EP support is released") public class IcebergNumericTypesIT extends AbstractDataTypeTest { @@ -34,11 +36,21 @@ public static Object[][] parameters() { @Parameterized.Parameter(1) public static Constants.IcebergSerializationPolicy icebergSerializationPolicy; - static final Random generator = new Random(0x5EED); + private static final Logger logger = LoggerFactory.getLogger(IcebergNumericTypesIT.class); + private static Random generator; + private static RandomStringGenerator randomStringGenerator; @Before public void before() throws Exception { super.beforeIceberg(compressionAlgorithm, icebergSerializationPolicy); + long seed = System.currentTimeMillis(); + logger.info("Random seed: {}", seed); + generator = new Random(123); + randomStringGenerator = + new RandomStringGenerator.Builder() + .usingRandom(generator::nextInt) + .withinRange('0', '9') + .build(); } @Test @@ -417,9 +429,10 @@ private static List randomBigDecimal(int count, int precision, int scale } list.add( new BigDecimal( - RandomStringUtils.randomNumeric(intPart) + (generator.nextBoolean() ? "-" : "") + + randomStringGenerator.generate(intPart) + "." - + RandomStringUtils.randomNumeric(floatPart))); + + randomStringGenerator.generate(floatPart))); } return list; }