Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-alhuang committed Sep 30, 2024
1 parent efb4a3f commit f8303fa
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1082,7 +1082,8 @@ static int validateAndParseBoolean(String columnName, Object input, long insertR
throw new SFException(
ErrorCode.INVALID_FORMAT_ROW,
String.format(
"Field name of a struct must be of type String, rowIndex:%d", insertRowIndex));
"Field name of struct %s must be of type String, rowIndex:%d",
columnName, insertRowIndex));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
Expand Down Expand Up @@ -396,14 +395,7 @@ private static ParquetBufferValue getGroupValue(
return get3LevelListValue(value, type, statsMap, defaultTimezone, insertRowsCurrIndex, path);
}
if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.MapLogicalTypeAnnotation) {
return get3LevelMapValue(
value,
type,
statsMap,
defaultTimezone,
insertRowsCurrIndex,
path,
isDescendantsOfRepeatingGroup);
return get3LevelMapValue(value, type, statsMap, defaultTimezone, insertRowsCurrIndex, path);
}
throw new SFException(
ErrorCode.UNKNOWN_DATA_TYPE, logicalTypeAnnotation, type.getClass().getSimpleName());
Expand Down Expand Up @@ -442,11 +434,11 @@ private static ParquetBufferValue getStructValue(
estimatedParquetSize += parsedValue.getSize();
}
if (!extraFields.isEmpty()) {
extraFields =
extraFields.stream().map(f -> concatDotPath(path, f)).collect(Collectors.toSet());
String extraFieldsStr =
extraFields.stream().map(f -> concatDotPath(path, f)).collect(Collectors.joining(", "));
throw new SFException(
ErrorCode.INVALID_FORMAT_ROW,
"Extra fields: " + extraFields,
"Extra fields: " + extraFieldsStr,
String.format(
"Fields not present in the struct shouldn't be specified, rowIndex:%d",
insertRowsCurrIndex));
Expand All @@ -471,22 +463,22 @@ private static ParquetBufferValue get3LevelListValue(
Iterable<?> iterableVal =
DataValidationUtil.validateAndParseIcebergList(path, value, insertRowsCurrIndex);
List<Object> listVal = new ArrayList<>();
final AtomicReference<Float> estimatedParquetSize = new AtomicReference<>(0f);
iterableVal.forEach(
element -> {
ParquetBufferValue parsedValue =
parseColumnValueToParquet(
element,
type.getType(0).asGroupType().getType(0),
statsMap,
defaultTimezone,
insertRowsCurrIndex,
concatDotPath(path, THREE_LEVEL_LIST_GROUP_NAME),
true);
listVal.add(Collections.singletonList(parsedValue.getValue()));
estimatedParquetSize.updateAndGet(sz -> sz + parsedValue.getSize());
});
return new ParquetBufferValue(listVal, estimatedParquetSize.get());
float estimatedParquetSize = 0;
String listGroupPath = concatDotPath(path, THREE_LEVEL_LIST_GROUP_NAME);
for (Object val : iterableVal) {
ParquetBufferValue parsedValue =
parseColumnValueToParquet(
val,
type.getType(0).asGroupType().getType(0),
statsMap,
defaultTimezone,
insertRowsCurrIndex,
listGroupPath,
true);
listVal.add(Collections.singletonList(parsedValue.getValue()));
estimatedParquetSize += parsedValue.getSize();
}
return new ParquetBufferValue(listVal, estimatedParquetSize);
}

/**
Expand All @@ -502,35 +494,34 @@ private static ParquetBufferValue get3LevelMapValue(
Map<String, RowBufferStats> statsMap,
ZoneId defaultTimezone,
final long insertRowsCurrIndex,
String path,
boolean isDescendantsOfRepeatingGroup) {
String path) {
Map<?, ?> mapVal =
DataValidationUtil.validateAndParseIcebergMap(path, value, insertRowsCurrIndex);
List<Object> listVal = new ArrayList<>();
final AtomicReference<Float> estimatedParquetSize = new AtomicReference<>(0f);
mapVal.forEach(
(k, v) -> {
ParquetBufferValue parsedKey =
parseColumnValueToParquet(
k,
type.getType(0).asGroupType().getType(0),
statsMap,
defaultTimezone,
insertRowsCurrIndex,
concatDotPath(path, THREE_LEVEL_MAP_GROUP_NAME),
true);
ParquetBufferValue parsedValue =
parseColumnValueToParquet(
v,
type.getType(0).asGroupType().getType(1),
statsMap,
defaultTimezone,
insertRowsCurrIndex,
concatDotPath(path, THREE_LEVEL_MAP_GROUP_NAME),
isDescendantsOfRepeatingGroup);
listVal.add(Arrays.asList(parsedKey.getValue(), parsedValue.getValue()));
estimatedParquetSize.updateAndGet(sz -> sz + parsedKey.getSize() + parsedValue.getSize());
});
return new ParquetBufferValue(listVal, estimatedParquetSize.get());
float estimatedParquetSize = 0;
String mapGroupPath = concatDotPath(path, THREE_LEVEL_MAP_GROUP_NAME);
for (Map.Entry<?, ?> entry : mapVal.entrySet()) {
ParquetBufferValue parsedKey =
parseColumnValueToParquet(
entry.getKey(),
type.getType(0).asGroupType().getType(0),
statsMap,
defaultTimezone,
insertRowsCurrIndex,
mapGroupPath,
true);
ParquetBufferValue parsedValue =
parseColumnValueToParquet(
entry.getValue(),
type.getType(0).asGroupType().getType(1),
statsMap,
defaultTimezone,
insertRowsCurrIndex,
mapGroupPath,
true);
listVal.add(Arrays.asList(parsedKey.getValue(), parsedValue.getValue()));
estimatedParquetSize += parsedKey.getSize() + parsedValue.getSize();
}
return new ParquetBufferValue(listVal, estimatedParquetSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,8 @@ public void parseMap() throws JsonProcessingException {
convertToArrayList(objectMapper.readValue("[[1, 1], [2, 2]]", ArrayList.class)))
.expectedSize(
(4.0f + REPETITION_LEVEL_ENCODING_BYTE_LEN + DEFINITION_LEVEL_ENCODING_BYTE_LEN) * 2
+ (4.0f + DEFINITION_LEVEL_ENCODING_BYTE_LEN) * 2)
+ (4.0f + REPETITION_LEVEL_ENCODING_BYTE_LEN + DEFINITION_LEVEL_ENCODING_BYTE_LEN)
* 2)
.expectedMin(BigInteger.valueOf(1))
.expectedMax(BigInteger.valueOf(2))
.assertMatches();
Expand Down Expand Up @@ -630,12 +631,13 @@ public void parseStruct() throws JsonProcessingException {
0));
ParquetBufferValue pv =
IcebergParquetValueParser.parseColumnValueToParquet(
new java.util.HashMap<String, Object>() {
{
// a is null
put("b", "2");
}
},
Collections.unmodifiableMap(
new java.util.HashMap<String, Object>() {
{
// a is null
put("b", "2");
}
}),
struct,
rowBufferStatsMap,
UTC,
Expand Down

0 comments on commit f8303fa

Please sign in to comment.