Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-alhuang committed Oct 18, 2024
1 parent 5916907 commit f61bec6
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 201 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -418,18 +418,31 @@ private static ParquetBufferValue getStructValue(
DataValidationUtil.validateAndParseIcebergStruct(path, value, insertRowsCurrIndex);
Set<String> extraFields = new HashSet<>(structVal.keySet());
List<Object> listVal = new ArrayList<>(type.getFieldCount());

float estimatedParquetSize = 0f;
for (int i = 0; i < type.getFieldCount(); i++) {
StringBuilder sb = new StringBuilder();
String fieldName = type.getFieldName(i);
for (int j = 0; j < fieldName.length(); j++) {
if (fieldName.charAt(j) == '_') {
sb.append((char) Integer.parseInt(fieldName.substring(j + 2, j + 4), 16));
j += 3;
} else {
sb.append(fieldName.charAt(j));
}
}
String originalFieldName = sb.substring(0, sb.toString().lastIndexOf('_'));

ParquetBufferValue parsedValue =
parseColumnValueToParquet(
structVal.getOrDefault(type.getFieldName(i), null),
structVal.getOrDefault(originalFieldName, null),
type.getType(i),
statsMap,
defaultTimezone,
insertRowsCurrIndex,
path,
isDescendantsOfRepeatingGroup);
extraFields.remove(type.getFieldName(i));
extraFields.remove(originalFieldName);
listVal.add(parsedValue.getValue());
estimatedParquetSize += parsedValue.getSize();
}
Expand Down
209 changes: 10 additions & 199 deletions src/main/java/net/snowflake/ingest/utils/IcebergDataTypeParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,6 @@

package net.snowflake.ingest.utils;

import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -20,21 +12,18 @@
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nonnull;
import org.apache.iceberg.parquet.TypeToMessageType;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.JsonUtil;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation;

/**
* This class is used to Iceberg data type (include primitive types and nested types) serialization
* and deserialization.
*
* <p>This code is modified from
* GlobalServices/modules/data-lake/datalake-api/src/main/java/com/snowflake/metadata/iceberg
* /IcebergDataTypeParser.java and <a
* href="https://github.com/apache/iceberg/blob/main/parquet/src/main/java/org/apache/iceberg/parquet/TypeToMessageType.java">
* TypeToMessageType.java</a>
* /IcebergDataTypeParser.java
*/
public class IcebergDataTypeParser {
private static final String TYPE = "type";
Expand All @@ -55,26 +44,12 @@ public class IcebergDataTypeParser {
private static final String ELEMENT_REQUIRED = "element-required";
private static final String VALUE_REQUIRED = "value-required";

private static final LogicalTypeAnnotation STRING = LogicalTypeAnnotation.stringType();
private static final LogicalTypeAnnotation DATE = LogicalTypeAnnotation.dateType();
private static final LogicalTypeAnnotation TIME_MICROS =
LogicalTypeAnnotation.timeType(
false /* not adjusted to UTC */, LogicalTypeAnnotation.TimeUnit.MICROS);
private static final LogicalTypeAnnotation TIMESTAMP_MICROS =
LogicalTypeAnnotation.timestampType(
false /* not adjusted to UTC */, LogicalTypeAnnotation.TimeUnit.MICROS);
private static final LogicalTypeAnnotation TIMESTAMPTZ_MICROS =
LogicalTypeAnnotation.timestampType(
true /* adjusted to UTC */, LogicalTypeAnnotation.TimeUnit.MICROS);

private static final int DECIMAL_INT32_MAX_DIGITS = 9;
private static final int DECIMAL_INT64_MAX_DIGITS = 18;
private static final int DECIMAL_MAX_DIGITS = 38;
private static final int DECIMAL_MAX_BYTES = 16;

/** Object mapper for this class */
private static final ObjectMapper MAPPER = new ObjectMapper();

/** Util class that contains the mapping between Iceberg data type and Parquet data type */
private static final TypeToMessageType typeToMessageType = new TypeToMessageType();

/**
* Get Iceberg data type information by deserialization.
*
Expand All @@ -91,15 +66,15 @@ public static org.apache.parquet.schema.Type parseIcebergDataTypeStringToParquet
String name) {
Type icebergType = deserializeIcebergType(icebergDataType);
if (icebergType.isPrimitiveType()) {
return primitive(icebergType.asPrimitiveType(), repetition, id, name);
return typeToMessageType.primitive(icebergType.asPrimitiveType(), repetition, id, name);
} else {
switch (icebergType.typeId()) {
case LIST:
return list(icebergType.asListType(), repetition, id, name);
return typeToMessageType.list(icebergType.asListType(), repetition, id, name);
case MAP:
return map(icebergType.asMapType(), repetition, id, name);
return typeToMessageType.map(icebergType.asMapType(), repetition, id, name);
case STRUCT:
return struct(icebergType.asStructType(), repetition, id, name);
return typeToMessageType.struct(icebergType.asStructType(), repetition, id, name);
default:
throw new SFException(
ErrorCode.INTERNAL_ERROR,
Expand Down Expand Up @@ -179,7 +154,7 @@ public static Type getTypeFromJson(@Nonnull JsonNode jsonNode) {
field.isObject(), "Cannot parse struct field from non-object: %s", field);

int id = JsonUtil.getInt(ID, field);
String name = JsonUtil.getString(NAME, field);
String name = (JsonUtil.getString(NAME, field) + "_" + id).replace("_", "_x5F");
Type type = getTypeFromJson(field.get(TYPE));

String doc = JsonUtil.getStringOrNull(DOC, field);
Expand Down Expand Up @@ -233,168 +208,4 @@ public static Types.MapType mapFromJson(JsonNode json) {
return Types.MapType.ofOptional(keyId, valueId, keyType, valueType);
}
}

private static GroupType struct(
Types.StructType struct,
org.apache.parquet.schema.Type.Repetition repetition,
int id,
String name) {
org.apache.parquet.schema.Types.GroupBuilder<GroupType> builder =
org.apache.parquet.schema.Types.buildGroup(repetition);

for (Types.NestedField field : struct.fields()) {
builder.addField(field(field));
}

return builder.id(id).named(name);
}

private static org.apache.parquet.schema.Type field(Types.NestedField field) {
org.apache.parquet.schema.Type.Repetition repetition =
field.isOptional()
? org.apache.parquet.schema.Type.Repetition.OPTIONAL
: org.apache.parquet.schema.Type.Repetition.REQUIRED;
int id = field.fieldId();
String name = field.name();

if (field.type().isPrimitiveType()) {
return primitive(field.type().asPrimitiveType(), repetition, id, name);

} else {
Type.NestedType nested = field.type().asNestedType();
if (nested.isStructType()) {
return struct(nested.asStructType(), repetition, id, name);
} else if (nested.isMapType()) {
return map(nested.asMapType(), repetition, id, name);
} else if (nested.isListType()) {
return list(nested.asListType(), repetition, id, name);
}
throw new UnsupportedOperationException("Can't convert unknown type: " + nested);
}
}

private static GroupType list(
Types.ListType list,
org.apache.parquet.schema.Type.Repetition repetition,
int id,
String name) {
Types.NestedField elementField = list.fields().get(0);
return org.apache.parquet.schema.Types.list(repetition)
.element(field(elementField))
.id(id)
.named(name);
}

private static GroupType map(
Types.MapType map,
org.apache.parquet.schema.Type.Repetition repetition,
int id,
String name) {
Types.NestedField keyField = map.fields().get(0);
Types.NestedField valueField = map.fields().get(1);
return org.apache.parquet.schema.Types.map(repetition)
.key(field(keyField))
.value(field(valueField))
.id(id)
.named(name);
}

public static org.apache.parquet.schema.Type primitive(
Type.PrimitiveType primitive,
org.apache.parquet.schema.Type.Repetition repetition,
int id,
String name) {
switch (primitive.typeId()) {
case BOOLEAN:
return org.apache.parquet.schema.Types.primitive(BOOLEAN, repetition).id(id).named(name);
case INTEGER:
return org.apache.parquet.schema.Types.primitive(INT32, repetition).id(id).named(name);
case LONG:
return org.apache.parquet.schema.Types.primitive(INT64, repetition).id(id).named(name);
case FLOAT:
return org.apache.parquet.schema.Types.primitive(FLOAT, repetition).id(id).named(name);
case DOUBLE:
return org.apache.parquet.schema.Types.primitive(DOUBLE, repetition).id(id).named(name);
case DATE:
return org.apache.parquet.schema.Types.primitive(INT32, repetition)
.as(DATE)
.id(id)
.named(name);
case TIME:
return org.apache.parquet.schema.Types.primitive(INT64, repetition)
.as(TIME_MICROS)
.id(id)
.named(name);
case TIMESTAMP:
if (((Types.TimestampType) primitive).shouldAdjustToUTC()) {
return org.apache.parquet.schema.Types.primitive(INT64, repetition)
.as(TIMESTAMPTZ_MICROS)
.id(id)
.named(name);
} else {
return org.apache.parquet.schema.Types.primitive(INT64, repetition)
.as(TIMESTAMP_MICROS)
.id(id)
.named(name);
}
case STRING:
return org.apache.parquet.schema.Types.primitive(BINARY, repetition)
.as(STRING)
.id(id)
.named(name);
case BINARY:
return org.apache.parquet.schema.Types.primitive(BINARY, repetition).id(id).named(name);
case FIXED:
Types.FixedType fixed = (Types.FixedType) primitive;

return org.apache.parquet.schema.Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition)
.length(fixed.length())
.id(id)
.named(name);

case DECIMAL:
Types.DecimalType decimal = (Types.DecimalType) primitive;

if (decimal.precision() <= DECIMAL_INT32_MAX_DIGITS) {
/* Store as an int. */
return org.apache.parquet.schema.Types.primitive(INT32, repetition)
.as(decimalAnnotation(decimal.precision(), decimal.scale()))
.id(id)
.named(name);

} else if (decimal.precision() <= DECIMAL_INT64_MAX_DIGITS) {
/* Store as a long. */
return org.apache.parquet.schema.Types.primitive(INT64, repetition)
.as(decimalAnnotation(decimal.precision(), decimal.scale()))
.id(id)
.named(name);

} else {
/* Does not follow Iceberg spec which should be minimum number of bytes. Use fix(16) (SB16) instead. */
if (decimal.precision() > DECIMAL_MAX_DIGITS) {
throw new IllegalArgumentException(
"Unsupported decimal precision: " + decimal.precision());
}
return org.apache.parquet.schema.Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition)
.length(DECIMAL_MAX_BYTES)
.as(decimalAnnotation(decimal.precision(), decimal.scale()))
.id(id)
.named(name);
}

case UUID:
return org.apache.parquet.schema.Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition)
.length(16)
.as(LogicalTypeAnnotation.uuidType())
.id(id)
.named(name);

default:
throw new UnsupportedOperationException("Unsupported type for Parquet: " + primitive);
}
}

private static LogicalTypeAnnotation decimalAnnotation(int precision, int scale) {
return LogicalTypeAnnotation.decimalType(scale, precision);
}
}

0 comments on commit f61bec6

Please sign in to comment.