Skip to content

Commit

Permalink
done
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-alhuang committed Nov 4, 2024
1 parent b38f969 commit ba48f45
Show file tree
Hide file tree
Showing 11 changed files with 844 additions and 273 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
package net.snowflake.ingest.streaming.internal;

import static net.snowflake.ingest.streaming.internal.DataValidationUtil.checkFixedLengthByteArray;
import static net.snowflake.ingest.utils.Utils.concatDotPath;
import static net.snowflake.ingest.utils.Utils.isNullOrEmpty;

import java.math.BigDecimal;
import java.math.BigInteger;
Expand Down Expand Up @@ -41,9 +39,6 @@
/** Parses a user Iceberg column value into Parquet internal representation for buffering. */
class IcebergParquetValueParser {

static final String THREE_LEVEL_MAP_GROUP_NAME = "key_value";
static final String THREE_LEVEL_LIST_GROUP_NAME = "list";

/**
* Parses a user column value into Parquet internal representation for buffering.
*
Expand All @@ -65,7 +60,7 @@ static ParquetBufferValue parseColumnValueToParquet(
long insertRowsCurrIndex) {
Utils.assertNotNull("Parquet column stats map", statsMap);
return parseColumnValueToParquet(
value, type, statsMap, subColumnFinder, defaultTimezone, insertRowsCurrIndex, null, false);
value, type, statsMap, subColumnFinder, defaultTimezone, insertRowsCurrIndex, false);
}

private static ParquetBufferValue parseColumnValueToParquet(
Expand All @@ -75,21 +70,27 @@ private static ParquetBufferValue parseColumnValueToParquet(
SubColumnFinder subColumnFinder,
ZoneId defaultTimezone,
long insertRowsCurrIndex,
String path,
boolean isDescendantsOfRepeatingGroup) {
path = isNullOrEmpty(path) ? type.getName() : concatDotPath(path, type.getName());
float estimatedParquetSize = 0F;

if (type.getId() == null) {
throw new SFException(
ErrorCode.INTERNAL_ERROR, String.format("Id not found for field: %s", type.getName()));
}
String id = type.getId().toString();

if (type.isPrimitive()) {
if (!statsMap.containsKey(path)) {
if (!statsMap.containsKey(id)) {
throw new SFException(
ErrorCode.INTERNAL_ERROR, String.format("Stats not found for column: %s", path));
ErrorCode.INTERNAL_ERROR,
String.format("Stats not found for fieldId: %s", type.getId()));
}
}

if (value != null) {
String path = subColumnFinder.getDotPath(id);
if (type.isPrimitive()) {
RowBufferStats stats = statsMap.get(path);
RowBufferStats stats = statsMap.get(id);
estimatedParquetSize += ParquetBufferValue.DEFINITION_LEVEL_ENCODING_BYTE_LEN;
estimatedParquetSize +=
isDescendantsOfRepeatingGroup
Expand Down Expand Up @@ -160,7 +161,6 @@ private static ParquetBufferValue parseColumnValueToParquet(
subColumnFinder,
defaultTimezone,
insertRowsCurrIndex,
path,
isDescendantsOfRepeatingGroup);
}
}
Expand All @@ -169,13 +169,13 @@ private static ParquetBufferValue parseColumnValueToParquet(
if (type.isRepetition(Repetition.REQUIRED)) {
throw new SFException(
ErrorCode.INVALID_FORMAT_ROW,
path,
subColumnFinder.getDotPath(id),
String.format(
"Passed null to non nullable field, rowIndex:%d, column:%s",
insertRowsCurrIndex, path));
insertRowsCurrIndex, subColumnFinder.getDotPath(id)));
}
subColumnFinder
.getSubColumns(path)
.getSubColumns(id)
.forEach(subColumn -> statsMap.get(subColumn).incCurrentNullCount());
}

Expand Down Expand Up @@ -381,7 +381,6 @@ private static int timeUnitToScale(LogicalTypeAnnotation.TimeUnit timeUnit) {
* @param subColumnFinder helper class to find stats of sub-columns
* @param defaultTimezone default timezone to use for timestamp parsing
* @param insertRowsCurrIndex Used for logging the row of index given in insertRows API
* @param path dot path of the column
* @param isDescendantsOfRepeatingGroup true if the column is a descendant of a repeating group,
* @return list of parsed values
*/
Expand All @@ -392,7 +391,6 @@ private static ParquetBufferValue getGroupValue(
SubColumnFinder subColumnFinder,
ZoneId defaultTimezone,
final long insertRowsCurrIndex,
String path,
boolean isDescendantsOfRepeatingGroup) {
LogicalTypeAnnotation logicalTypeAnnotation = type.getLogicalTypeAnnotation();
if (logicalTypeAnnotation == null) {
Expand All @@ -403,19 +401,21 @@ private static ParquetBufferValue getGroupValue(
subColumnFinder,
defaultTimezone,
insertRowsCurrIndex,
path,
isDescendantsOfRepeatingGroup);
}
if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.ListLogicalTypeAnnotation) {
return get3LevelListValue(
value, type, statsMap, subColumnFinder, defaultTimezone, insertRowsCurrIndex, path);
value, type, statsMap, subColumnFinder, defaultTimezone, insertRowsCurrIndex);
}
if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.MapLogicalTypeAnnotation) {
return get3LevelMapValue(
value, type, statsMap, subColumnFinder, defaultTimezone, insertRowsCurrIndex, path);
value, type, statsMap, subColumnFinder, defaultTimezone, insertRowsCurrIndex);
}
throw new SFException(
ErrorCode.UNKNOWN_DATA_TYPE, path, logicalTypeAnnotation, type.getClass().getSimpleName());
ErrorCode.UNKNOWN_DATA_TYPE,
subColumnFinder.getDotPath(type.getId().toString()),
logicalTypeAnnotation,
type.getClass().getSimpleName());
}

/**
Expand All @@ -430,10 +430,10 @@ private static ParquetBufferValue getStructValue(
SubColumnFinder subColumnFinder,
ZoneId defaultTimezone,
final long insertRowsCurrIndex,
String path,
boolean isDescendantsOfRepeatingGroup) {
Map<String, ?> structVal =
DataValidationUtil.validateAndParseIcebergStruct(path, value, insertRowsCurrIndex);
DataValidationUtil.validateAndParseIcebergStruct(
subColumnFinder.getDotPath(type.getId().toString()), value, insertRowsCurrIndex);
Set<String> extraFields = new HashSet<>(structVal.keySet());
List<Object> listVal = new ArrayList<>(type.getFieldCount());
float estimatedParquetSize = 0f;
Expand All @@ -446,21 +446,19 @@ private static ParquetBufferValue getStructValue(
subColumnFinder,
defaultTimezone,
insertRowsCurrIndex,
path,
isDescendantsOfRepeatingGroup);
extraFields.remove(type.getFieldName(i));
listVal.add(parsedValue.getValue());
estimatedParquetSize += parsedValue.getSize();
}
if (!extraFields.isEmpty()) {
String extraFieldsStr =
extraFields.stream().map(f -> concatDotPath(path, f)).collect(Collectors.joining(", "));
String extraFieldsStr = extraFields.stream().collect(Collectors.joining(", ", "[", "]"));
throw new SFException(
ErrorCode.INVALID_FORMAT_ROW,
"Extra fields: " + extraFieldsStr,
String.format(
"Fields not present in the struct shouldn't be specified, rowIndex:%d",
insertRowsCurrIndex));
"Fields not present in the struct %s shouldn't be specified, rowIndex:%d",
subColumnFinder.getDotPath(type.getId().toString()), insertRowsCurrIndex));
}
return new ParquetBufferValue(listVal, estimatedParquetSize);
}
Expand All @@ -478,13 +476,12 @@ private static ParquetBufferValue get3LevelListValue(
Map<String, RowBufferStats> statsMap,
SubColumnFinder subColumnFinder,
ZoneId defaultTimezone,
final long insertRowsCurrIndex,
String path) {
final long insertRowsCurrIndex) {
Iterable<?> iterableVal =
DataValidationUtil.validateAndParseIcebergList(path, value, insertRowsCurrIndex);
DataValidationUtil.validateAndParseIcebergList(
subColumnFinder.getDotPath(type.getId().toString()), value, insertRowsCurrIndex);
List<Object> listVal = new ArrayList<>();
float estimatedParquetSize = 0;
String listGroupPath = concatDotPath(path, THREE_LEVEL_LIST_GROUP_NAME);
for (Object val : iterableVal) {
ParquetBufferValue parsedValue =
parseColumnValueToParquet(
Expand All @@ -494,14 +491,13 @@ private static ParquetBufferValue get3LevelListValue(
subColumnFinder,
defaultTimezone,
insertRowsCurrIndex,
listGroupPath,
true);
listVal.add(Collections.singletonList(parsedValue.getValue()));
estimatedParquetSize += parsedValue.getSize();
}
if (listVal.isEmpty()) {
subColumnFinder
.getSubColumns(path)
.getSubColumns(type.getId().toString())
.forEach(subColumn -> statsMap.get(subColumn).incCurrentNullCount());
}
return new ParquetBufferValue(listVal, estimatedParquetSize);
Expand All @@ -520,13 +516,12 @@ private static ParquetBufferValue get3LevelMapValue(
Map<String, RowBufferStats> statsMap,
SubColumnFinder subColumnFinder,
ZoneId defaultTimezone,
final long insertRowsCurrIndex,
String path) {
final long insertRowsCurrIndex) {
Map<?, ?> mapVal =
DataValidationUtil.validateAndParseIcebergMap(path, value, insertRowsCurrIndex);
DataValidationUtil.validateAndParseIcebergMap(
subColumnFinder.getDotPath(type.getId().toString()), value, insertRowsCurrIndex);
List<Object> listVal = new ArrayList<>();
float estimatedParquetSize = 0;
String mapGroupPath = concatDotPath(path, THREE_LEVEL_MAP_GROUP_NAME);
for (Map.Entry<?, ?> entry : mapVal.entrySet()) {
ParquetBufferValue parsedKey =
parseColumnValueToParquet(
Expand All @@ -536,7 +531,6 @@ private static ParquetBufferValue get3LevelMapValue(
subColumnFinder,
defaultTimezone,
insertRowsCurrIndex,
mapGroupPath,
true);
ParquetBufferValue parsedValue =
parseColumnValueToParquet(
Expand All @@ -546,14 +540,13 @@ private static ParquetBufferValue get3LevelMapValue(
subColumnFinder,
defaultTimezone,
insertRowsCurrIndex,
mapGroupPath,
true);
listVal.add(Arrays.asList(parsedKey.getValue(), parsedValue.getValue()));
estimatedParquetSize += parsedKey.getSize() + parsedValue.getSize();
}
if (listVal.isEmpty()) {
subColumnFinder
.getSubColumns(path)
.getSubColumns(type.getId().toString())
.forEach(subColumn -> statsMap.get(subColumn).incCurrentNullCount());
}
return new ParquetBufferValue(listVal, estimatedParquetSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ public void setupSchema(List<ColumnMetadata> columns) {
int ordinal = schema.getType(columnDescriptor.getPath()[0]).getId().intValue();

this.statsMap.put(
columnDotPath,
primitiveType.getId().toString(),
new RowBufferStats(
columnDotPath,
null /* collationDefinitionString */,
Expand All @@ -237,7 +237,7 @@ public void setupSchema(List<ColumnMetadata> columns) {
if (onErrorOption == OpenChannelRequest.OnErrorOption.ABORT
|| onErrorOption == OpenChannelRequest.OnErrorOption.SKIP_BATCH) {
this.tempStatsMap.put(
columnDotPath,
primitiveType.getId().toString(),
new RowBufferStats(
columnDotPath,
null /* collationDefinitionString */,
Expand Down Expand Up @@ -364,12 +364,13 @@ private float addRow(
throw new SFException(ErrorCode.INTERNAL_ERROR, "SubColumnFinder is not initialized.");
}

for (String subColumn : subColumnFinder.getSubColumns(columnName)) {
RowBufferStats stats = statsMap.get(subColumn);
for (String subColumnId :
subColumnFinder.getSubColumns(fieldIndex.get(columnName).type.getId().toString())) {
RowBufferStats stats = statsMap.get(subColumnId);
if (stats == null) {
throw new SFException(
ErrorCode.INTERNAL_ERROR,
String.format("Column %s not found in stats map.", subColumn));
String.format("Field id %s not found in stats map.", subColumnId));
}
stats.incCurrentNullCount();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public class IcebergDataTypeParser {
private static final String ELEMENT_REQUIRED = "element-required";
private static final String VALUE_REQUIRED = "value-required";

private static final String EMPTY_FIELD_CHAR = "\\";

/** Object mapper for this class */
private static final ObjectMapper MAPPER = new ObjectMapper();

Expand All @@ -65,16 +67,21 @@ public static org.apache.parquet.schema.Type parseIcebergDataTypeStringToParquet
int id,
String name) {
Type icebergType = deserializeIcebergType(icebergDataType);
org.apache.parquet.schema.Type parquetType;
if (icebergType.isPrimitiveType()) {
return typeToMessageType.primitive(icebergType.asPrimitiveType(), repetition, id, name);
parquetType =
typeToMessageType.primitive(icebergType.asPrimitiveType(), repetition, id, name);
} else {
switch (icebergType.typeId()) {
case LIST:
return typeToMessageType.list(icebergType.asListType(), repetition, id, name);
parquetType = typeToMessageType.list(icebergType.asListType(), repetition, id, name);
break;
case MAP:
return typeToMessageType.map(icebergType.asMapType(), repetition, id, name);
parquetType = typeToMessageType.map(icebergType.asMapType(), repetition, id, name);
break;
case STRUCT:
return typeToMessageType.struct(icebergType.asStructType(), repetition, id, name);
parquetType = typeToMessageType.struct(icebergType.asStructType(), repetition, id, name);
break;
default:
throw new SFException(
ErrorCode.INTERNAL_ERROR,
Expand All @@ -83,6 +90,7 @@ public static org.apache.parquet.schema.Type parseIcebergDataTypeStringToParquet
name, icebergDataType));
}
}
return replaceWithOriginalFieldName(parquetType, icebergType, name);
}

/**
Expand Down Expand Up @@ -154,7 +162,14 @@ public static Type getTypeFromJson(@Nonnull JsonNode jsonNode) {
field.isObject(), "Cannot parse struct field from non-object: %s", field);

int id = JsonUtil.getInt(ID, field);
String name = JsonUtil.getString(NAME, field);

/* TypeToMessageType throws on empty field name, use a backslash to represent it and escape remaining backslash. */
String name =
JsonUtil.getString(NAME, field)
.replace(EMPTY_FIELD_CHAR, EMPTY_FIELD_CHAR + EMPTY_FIELD_CHAR);
if (name.isEmpty()) {
name = EMPTY_FIELD_CHAR;
}
Type type = getTypeFromJson(field.get(TYPE));

String doc = JsonUtil.getStringOrNull(DOC, field);
Expand Down Expand Up @@ -208,4 +223,61 @@ public static Types.MapType mapFromJson(JsonNode json) {
return Types.MapType.ofOptional(keyId, valueId, keyType, valueType);
}
}

private static org.apache.parquet.schema.Type replaceWithOriginalFieldName(
org.apache.parquet.schema.Type parquetType, Type icebergType, String fieldName) {
if (parquetType.isPrimitive() != icebergType.isPrimitiveType()
|| (!parquetType.isPrimitive()
&& parquetType.getLogicalTypeAnnotation()
== null /* ignore outer layer of map or list */
&& parquetType.asGroupType().getFieldCount()
!= icebergType.asNestedType().fields().size())) {
throw new IllegalArgumentException(
String.format(
"Parquet type and Iceberg type mismatch: %s, %s", parquetType, icebergType));
}
if (parquetType.isPrimitive()) {
/* rename field name */
return org.apache.parquet.schema.Types.primitive(
parquetType.asPrimitiveType().getPrimitiveTypeName(), parquetType.getRepetition())
.as(parquetType.asPrimitiveType().getLogicalTypeAnnotation())
.id(parquetType.getId().intValue())
.length(parquetType.asPrimitiveType().getTypeLength())
.named(fieldName);
} else {
org.apache.parquet.schema.Types.GroupBuilder<org.apache.parquet.schema.GroupType> builder =
org.apache.parquet.schema.Types.buildGroup(parquetType.getRepetition());
for (org.apache.parquet.schema.Type parquetFieldType :
parquetType.asGroupType().getFields()) {
if (parquetFieldType.getId() == null) {
/* middle layer of map or list. Skip this level as parquet's using 3-level list/map while iceberg's using 2-level list/map */
builder.addField(
replaceWithOriginalFieldName(
parquetFieldType, icebergType, parquetFieldType.getName()));
} else {
Types.NestedField icebergField =
icebergType.asNestedType().field(parquetFieldType.getId().intValue());
if (icebergField == null) {
throw new IllegalArgumentException(
String.format(
"Cannot find Iceberg field with id %d in Iceberg type: %s",
parquetFieldType.getId().intValue(), icebergType));
}
builder.addField(
replaceWithOriginalFieldName(
parquetFieldType,
icebergField.type(),
icebergField.name().equals(EMPTY_FIELD_CHAR)
? ""
: icebergField
.name()
.replace(EMPTY_FIELD_CHAR + EMPTY_FIELD_CHAR, EMPTY_FIELD_CHAR)));
}
}
if (parquetType.getId() != null) {
builder.id(parquetType.getId().intValue());
}
return builder.as(parquetType.getLogicalTypeAnnotation()).named(fieldName);
}
}
}
Loading

0 comments on commit ba48f45

Please sign in to comment.