Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-alhuang committed Nov 5, 2024
1 parent ba48f45 commit d939c5a
Show file tree
Hide file tree
Showing 8 changed files with 292 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,20 +77,19 @@ private static ParquetBufferValue parseColumnValueToParquet(
throw new SFException(
ErrorCode.INTERNAL_ERROR, String.format("Id not found for field: %s", type.getName()));
}
String id = type.getId().toString();
Type.ID id = type.getId();

if (type.isPrimitive()) {
if (!statsMap.containsKey(id)) {
if (!statsMap.containsKey(id.toString())) {
throw new SFException(
ErrorCode.INTERNAL_ERROR,
String.format("Stats not found for fieldId: %s", type.getId()));
ErrorCode.INTERNAL_ERROR, String.format("Stats not found. id=%s", type.getId()));
}
}

if (value != null) {
String path = subColumnFinder.getDotPath(id);
if (type.isPrimitive()) {
RowBufferStats stats = statsMap.get(id);
RowBufferStats stats = statsMap.get(id.toString());
estimatedParquetSize += ParquetBufferValue.DEFINITION_LEVEL_ENCODING_BYTE_LEN;
estimatedParquetSize +=
isDescendantsOfRepeatingGroup
Expand Down Expand Up @@ -413,7 +412,7 @@ private static ParquetBufferValue getGroupValue(
}
throw new SFException(
ErrorCode.UNKNOWN_DATA_TYPE,
subColumnFinder.getDotPath(type.getId().toString()),
subColumnFinder.getDotPath(type.getId()),
logicalTypeAnnotation,
type.getClass().getSimpleName());
}
Expand All @@ -433,7 +432,7 @@ private static ParquetBufferValue getStructValue(
boolean isDescendantsOfRepeatingGroup) {
Map<String, ?> structVal =
DataValidationUtil.validateAndParseIcebergStruct(
subColumnFinder.getDotPath(type.getId().toString()), value, insertRowsCurrIndex);
subColumnFinder.getDotPath(type.getId()), value, insertRowsCurrIndex);
Set<String> extraFields = new HashSet<>(structVal.keySet());
List<Object> listVal = new ArrayList<>(type.getFieldCount());
float estimatedParquetSize = 0f;
Expand All @@ -458,7 +457,7 @@ private static ParquetBufferValue getStructValue(
"Extra fields: " + extraFieldsStr,
String.format(
"Fields not present in the struct %s shouldn't be specified, rowIndex:%d",
subColumnFinder.getDotPath(type.getId().toString()), insertRowsCurrIndex));
subColumnFinder.getDotPath(type.getId()), insertRowsCurrIndex));
}
return new ParquetBufferValue(listVal, estimatedParquetSize);
}
Expand All @@ -479,7 +478,7 @@ private static ParquetBufferValue get3LevelListValue(
final long insertRowsCurrIndex) {
Iterable<?> iterableVal =
DataValidationUtil.validateAndParseIcebergList(
subColumnFinder.getDotPath(type.getId().toString()), value, insertRowsCurrIndex);
subColumnFinder.getDotPath(type.getId()), value, insertRowsCurrIndex);
List<Object> listVal = new ArrayList<>();
float estimatedParquetSize = 0;
for (Object val : iterableVal) {
Expand All @@ -497,7 +496,7 @@ private static ParquetBufferValue get3LevelListValue(
}
if (listVal.isEmpty()) {
subColumnFinder
.getSubColumns(type.getId().toString())
.getSubColumns(type.getId())
.forEach(subColumn -> statsMap.get(subColumn).incCurrentNullCount());
}
return new ParquetBufferValue(listVal, estimatedParquetSize);
Expand All @@ -519,7 +518,7 @@ private static ParquetBufferValue get3LevelMapValue(
final long insertRowsCurrIndex) {
Map<?, ?> mapVal =
DataValidationUtil.validateAndParseIcebergMap(
subColumnFinder.getDotPath(type.getId().toString()), value, insertRowsCurrIndex);
subColumnFinder.getDotPath(type.getId()), value, insertRowsCurrIndex);
List<Object> listVal = new ArrayList<>();
float estimatedParquetSize = 0;
for (Map.Entry<?, ?> entry : mapVal.entrySet()) {
Expand All @@ -546,7 +545,7 @@ private static ParquetBufferValue get3LevelMapValue(
}
if (listVal.isEmpty()) {
subColumnFinder
.getSubColumns(type.getId().toString())
.getSubColumns(type.getId())
.forEach(subColumn -> statsMap.get(subColumn).incCurrentNullCount());
}
return new ParquetBufferValue(listVal, estimatedParquetSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,12 +365,15 @@ private float addRow(
}

for (String subColumnId :
subColumnFinder.getSubColumns(fieldIndex.get(columnName).type.getId().toString())) {
subColumnFinder.getSubColumns(fieldIndex.get(columnName).type.getId())) {
RowBufferStats stats = statsMap.get(subColumnId);
if (stats == null) {
throw new SFException(
ErrorCode.INTERNAL_ERROR,
String.format("Field id %s not found in stats map.", subColumnId));
String.format(
"Entry not found in stats map. fieldId=%s, column=%s.",
subColumnId,
subColumnFinder.getDotPath(new Type.ID(Integer.parseInt(subColumnId)))));
}
stats.incCurrentNullCount();
}
Expand Down
77 changes: 44 additions & 33 deletions src/main/java/net/snowflake/ingest/utils/IcebergDataTypeParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,18 @@ public static Types.MapType mapFromJson(JsonNode json) {
}
}

/**
* Replace the field name in the parquet schema with the original field name in the Iceberg
* schema. The parsed parquet schema my TypeToMessageType may have different field names than the
* original Iceberg schema as it uses AvroSchemaUtil to encode the field name. See {@link
* TypeToMessageType} and {@link org.apache.iceberg.avro.AvroSchemaUtil#makeCompatibleName} for
* more details.
*
* @param parquetType parquet schema type
* @param icebergType iceberg schema type
* @param fieldName original field name in the Iceberg schema
* @return parquet schema type with original field name
*/
private static org.apache.parquet.schema.Type replaceWithOriginalFieldName(
org.apache.parquet.schema.Type parquetType, Type icebergType, String fieldName) {
if (parquetType.isPrimitive() != icebergType.isPrimitiveType()
Expand All @@ -234,7 +246,8 @@ private static org.apache.parquet.schema.Type replaceWithOriginalFieldName(
!= icebergType.asNestedType().fields().size())) {
throw new IllegalArgumentException(
String.format(
"Parquet type and Iceberg type mismatch: %s, %s", parquetType, icebergType));
"Parquet type and Iceberg type mismatch. parquetType=%s, icebergType=%s",
parquetType, icebergType));
}
if (parquetType.isPrimitive()) {
/* rename field name */
Expand All @@ -244,40 +257,38 @@ private static org.apache.parquet.schema.Type replaceWithOriginalFieldName(
.id(parquetType.getId().intValue())
.length(parquetType.asPrimitiveType().getTypeLength())
.named(fieldName);
} else {
org.apache.parquet.schema.Types.GroupBuilder<org.apache.parquet.schema.GroupType> builder =
org.apache.parquet.schema.Types.buildGroup(parquetType.getRepetition());
for (org.apache.parquet.schema.Type parquetFieldType :
parquetType.asGroupType().getFields()) {
if (parquetFieldType.getId() == null) {
/* middle layer of map or list. Skip this level as parquet's using 3-level list/map while iceberg's using 2-level list/map */
builder.addField(
replaceWithOriginalFieldName(
parquetFieldType, icebergType, parquetFieldType.getName()));
} else {
Types.NestedField icebergField =
icebergType.asNestedType().field(parquetFieldType.getId().intValue());
if (icebergField == null) {
throw new IllegalArgumentException(
String.format(
"Cannot find Iceberg field with id %d in Iceberg type: %s",
parquetFieldType.getId().intValue(), icebergType));
}
builder.addField(
replaceWithOriginalFieldName(
parquetFieldType,
icebergField.type(),
icebergField.name().equals(EMPTY_FIELD_CHAR)
? ""
: icebergField
.name()
.replace(EMPTY_FIELD_CHAR + EMPTY_FIELD_CHAR, EMPTY_FIELD_CHAR)));
}
org.apache.parquet.schema.Types.GroupBuilder<org.apache.parquet.schema.GroupType> builder =
org.apache.parquet.schema.Types.buildGroup(parquetType.getRepetition());
for (org.apache.parquet.schema.Type parquetFieldType : parquetType.asGroupType().getFields()) {
if (parquetFieldType.getId() == null) {
/* middle layer of map or list. Skip this level as parquet's using 3-level list/map while iceberg's using 2-level list/map */
builder.addField(
replaceWithOriginalFieldName(
parquetFieldType, icebergType, parquetFieldType.getName()));
} else {
Types.NestedField icebergField =
icebergType.asNestedType().field(parquetFieldType.getId().intValue());
if (icebergField == null) {
throw new IllegalArgumentException(
String.format(
"Cannot find Iceberg field with id %d. parquetFieldType=%s, icebergType=%s",
parquetFieldType.getId().intValue(), parquetFieldType, icebergType));
}
builder.addField(
replaceWithOriginalFieldName(
parquetFieldType,
icebergField.type(),
icebergField.name().equals(EMPTY_FIELD_CHAR)
? "" /* Empty string are encoded as single backslash in #structFromJson. Decode them here. */
: icebergField
.name()
.replace(EMPTY_FIELD_CHAR + EMPTY_FIELD_CHAR, EMPTY_FIELD_CHAR)));
}
if (parquetType.getId() != null) {
builder.id(parquetType.getId().intValue());
}
return builder.as(parquetType.getLogicalTypeAnnotation()).named(fieldName);
}
if (parquetType.getId() != null) {
builder.id(parquetType.getId().intValue());
}
return builder.as(parquetType.getLogicalTypeAnnotation()).named(fieldName);
}
}
55 changes: 41 additions & 14 deletions src/main/java/net/snowflake/ingest/utils/SubColumnFinder.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,21 @@
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;

/** Helper class to find all leaf sub-columns in an immutable schema given a fieldId. */
/** Helper class to find all leaf columns in an immutable schema given a fieldId. */
public class SubColumnFinder {

/**
* Helper class to store the start and end index of the interval of leaf columns of a node in the
* list and the dot path of the node.
*/
static class SubtreeInfo {
/* Start index of the leaf column in the list. */
final int startTag;

/* End index of the leaf column in the list. */
final int endTag;

/* Dot path of the node. */
final String dotPath;

SubtreeInfo(int startTag, int endTag, String dotPath) {
Expand All @@ -28,53 +38,70 @@ static class SubtreeInfo {
}
}

/* A list to store all leaf columns field id in preorder traversal. */
private final List<String> list;
private final Map<String, SubtreeInfo> accessMap;

/* A map to cache query result, avoid recursive query during runtime. */
private final Map<Type.ID, SubtreeInfo> accessMap;

public SubColumnFinder(MessageType schema) {
accessMap = new HashMap<>();
list = new ArrayList<>();
build(schema, null);
}

public List<String> getSubColumns(String id) {
/**
* Get all leaf sub-column's field id of a node in the schema.
*
* @param id Field id of the node
* @return List of sub-column's field id
*/
public List<String> getSubColumns(Type.ID id) {
if (!accessMap.containsKey(id)) {
throw new IllegalArgumentException(String.format("Field %s not found in schema", id));
}
SubtreeInfo interval = accessMap.get(id);
return Collections.unmodifiableList(list.subList(interval.startTag, interval.endTag));
}

public String getDotPath(String id) {
/**
* Get the dot path of a node in the schema.
*
* @param id Field ID of the node
* @return Dot path of the node
*/
public String getDotPath(Type.ID id) {
if (!accessMap.containsKey(id)) {
throw new IllegalArgumentException(String.format("Field %s not found in schema", id));
}
return accessMap.get(id).dotPath;
}

private void build(Type node, List<String> path) {
if (path == null) {
private void build(Type node, List<String> currentPath) {
if (currentPath == null) {
/* Ignore root node type name (bdec or schema) */
path = new ArrayList<>();
currentPath = new ArrayList<>();
} else {
path.add(node.getName());
currentPath.add(node.getName());
}

int startTag = list.size();
if (!node.isPrimitive()) {
for (Type child : node.asGroupType().getFields()) {
build(child, path);
build(child, currentPath);
}
} else {
list.add(node.getId().toString());
}
if (!path.isEmpty() && node.getId() != null) {
if (!currentPath.isEmpty() && node.getId() != null) {
accessMap.put(
node.getId().toString(),
new SubtreeInfo(startTag, list.size(), concatDotPath(path.toArray(new String[0]))));
node.getId(),
new SubtreeInfo(
startTag, list.size(), concatDotPath(currentPath.toArray(new String[0]))));
}
if (!path.isEmpty()) {
path.remove(path.size() - 1);
if (!currentPath.isEmpty()) {
/* Remove the last element of the path at the end of recursion. */
currentPath.remove(currentPath.size() - 1);
}
}
}
Loading

0 comments on commit d939c5a

Please sign in to comment.