Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1757554 Support quoted object fields name #869

Merged
merged 4 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
package net.snowflake.ingest.streaming.internal;

import static net.snowflake.ingest.streaming.internal.DataValidationUtil.checkFixedLengthByteArray;
import static net.snowflake.ingest.utils.Utils.concatDotPath;
import static net.snowflake.ingest.utils.Utils.isNullOrEmpty;

import java.math.BigDecimal;
import java.math.BigInteger;
Expand Down Expand Up @@ -41,9 +39,6 @@
/** Parses a user Iceberg column value into Parquet internal representation for buffering. */
class IcebergParquetValueParser {

static final String THREE_LEVEL_MAP_GROUP_NAME = "key_value";
static final String THREE_LEVEL_LIST_GROUP_NAME = "list";

/**
* Parses a user column value into Parquet internal representation for buffering.
*
Expand All @@ -65,7 +60,7 @@ static ParquetBufferValue parseColumnValueToParquet(
long insertRowsCurrIndex) {
Utils.assertNotNull("Parquet column stats map", statsMap);
return parseColumnValueToParquet(
value, type, statsMap, subColumnFinder, defaultTimezone, insertRowsCurrIndex, null, false);
value, type, statsMap, subColumnFinder, defaultTimezone, insertRowsCurrIndex, false);
}

private static ParquetBufferValue parseColumnValueToParquet(
Expand All @@ -75,21 +70,26 @@ private static ParquetBufferValue parseColumnValueToParquet(
SubColumnFinder subColumnFinder,
ZoneId defaultTimezone,
long insertRowsCurrIndex,
String path,
boolean isDescendantsOfRepeatingGroup) {
path = isNullOrEmpty(path) ? type.getName() : concatDotPath(path, type.getName());
float estimatedParquetSize = 0F;

if (type.getId() == null) {
throw new SFException(
ErrorCode.INTERNAL_ERROR, String.format("Id not found for field: %s", type.getName()));
}
Type.ID id = type.getId();

if (type.isPrimitive()) {
if (!statsMap.containsKey(path)) {
if (!statsMap.containsKey(id.toString())) {
throw new SFException(
ErrorCode.INTERNAL_ERROR, String.format("Stats not found for column: %s", path));
ErrorCode.INTERNAL_ERROR, String.format("Stats not found. id=%s", type.getId()));
}
}

if (value != null) {
String path = subColumnFinder.getDotPath(id);
if (type.isPrimitive()) {
RowBufferStats stats = statsMap.get(path);
RowBufferStats stats = statsMap.get(id.toString());
estimatedParquetSize += ParquetBufferValue.DEFINITION_LEVEL_ENCODING_BYTE_LEN;
estimatedParquetSize +=
isDescendantsOfRepeatingGroup
Expand Down Expand Up @@ -160,7 +160,6 @@ private static ParquetBufferValue parseColumnValueToParquet(
subColumnFinder,
defaultTimezone,
insertRowsCurrIndex,
path,
isDescendantsOfRepeatingGroup);
}
}
Expand All @@ -169,13 +168,13 @@ private static ParquetBufferValue parseColumnValueToParquet(
if (type.isRepetition(Repetition.REQUIRED)) {
throw new SFException(
ErrorCode.INVALID_FORMAT_ROW,
path,
subColumnFinder.getDotPath(id),
String.format(
"Passed null to non nullable field, rowIndex:%d, column:%s",
insertRowsCurrIndex, path));
insertRowsCurrIndex, subColumnFinder.getDotPath(id)));
}
subColumnFinder
.getSubColumns(path)
.getSubColumns(id)
.forEach(subColumn -> statsMap.get(subColumn).incCurrentNullCount());
}

Expand Down Expand Up @@ -381,7 +380,6 @@ private static int timeUnitToScale(LogicalTypeAnnotation.TimeUnit timeUnit) {
* @param subColumnFinder helper class to find stats of sub-columns
* @param defaultTimezone default timezone to use for timestamp parsing
* @param insertRowsCurrIndex Used for logging the row of index given in insertRows API
* @param path dot path of the column
* @param isDescendantsOfRepeatingGroup true if the column is a descendant of a repeating group,
* @return list of parsed values
*/
Expand All @@ -392,7 +390,6 @@ private static ParquetBufferValue getGroupValue(
SubColumnFinder subColumnFinder,
ZoneId defaultTimezone,
final long insertRowsCurrIndex,
String path,
boolean isDescendantsOfRepeatingGroup) {
LogicalTypeAnnotation logicalTypeAnnotation = type.getLogicalTypeAnnotation();
if (logicalTypeAnnotation == null) {
Expand All @@ -403,19 +400,21 @@ private static ParquetBufferValue getGroupValue(
subColumnFinder,
defaultTimezone,
insertRowsCurrIndex,
path,
isDescendantsOfRepeatingGroup);
}
if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.ListLogicalTypeAnnotation) {
return get3LevelListValue(
value, type, statsMap, subColumnFinder, defaultTimezone, insertRowsCurrIndex, path);
value, type, statsMap, subColumnFinder, defaultTimezone, insertRowsCurrIndex);
}
if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.MapLogicalTypeAnnotation) {
return get3LevelMapValue(
value, type, statsMap, subColumnFinder, defaultTimezone, insertRowsCurrIndex, path);
value, type, statsMap, subColumnFinder, defaultTimezone, insertRowsCurrIndex);
}
throw new SFException(
ErrorCode.UNKNOWN_DATA_TYPE, path, logicalTypeAnnotation, type.getClass().getSimpleName());
ErrorCode.UNKNOWN_DATA_TYPE,
subColumnFinder.getDotPath(type.getId()),
logicalTypeAnnotation,
type.getClass().getSimpleName());
}

/**
Expand All @@ -430,10 +429,10 @@ private static ParquetBufferValue getStructValue(
SubColumnFinder subColumnFinder,
ZoneId defaultTimezone,
final long insertRowsCurrIndex,
String path,
boolean isDescendantsOfRepeatingGroup) {
Map<String, ?> structVal =
DataValidationUtil.validateAndParseIcebergStruct(path, value, insertRowsCurrIndex);
DataValidationUtil.validateAndParseIcebergStruct(
subColumnFinder.getDotPath(type.getId()), value, insertRowsCurrIndex);
Set<String> extraFields = new HashSet<>(structVal.keySet());
List<Object> listVal = new ArrayList<>(type.getFieldCount());
float estimatedParquetSize = 0f;
Expand All @@ -446,21 +445,19 @@ private static ParquetBufferValue getStructValue(
subColumnFinder,
defaultTimezone,
insertRowsCurrIndex,
path,
isDescendantsOfRepeatingGroup);
extraFields.remove(type.getFieldName(i));
listVal.add(parsedValue.getValue());
estimatedParquetSize += parsedValue.getSize();
}
if (!extraFields.isEmpty()) {
String extraFieldsStr =
extraFields.stream().map(f -> concatDotPath(path, f)).collect(Collectors.joining(", "));
String extraFieldsStr = extraFields.stream().collect(Collectors.joining(", ", "[", "]"));
throw new SFException(
ErrorCode.INVALID_FORMAT_ROW,
"Extra fields: " + extraFieldsStr,
String.format(
"Fields not present in the struct shouldn't be specified, rowIndex:%d",
insertRowsCurrIndex));
"Fields not present in the struct %s shouldn't be specified, rowIndex:%d",
subColumnFinder.getDotPath(type.getId()), insertRowsCurrIndex));
}
return new ParquetBufferValue(listVal, estimatedParquetSize);
}
Expand All @@ -478,13 +475,12 @@ private static ParquetBufferValue get3LevelListValue(
Map<String, RowBufferStats> statsMap,
SubColumnFinder subColumnFinder,
ZoneId defaultTimezone,
final long insertRowsCurrIndex,
String path) {
final long insertRowsCurrIndex) {
Iterable<?> iterableVal =
DataValidationUtil.validateAndParseIcebergList(path, value, insertRowsCurrIndex);
DataValidationUtil.validateAndParseIcebergList(
subColumnFinder.getDotPath(type.getId()), value, insertRowsCurrIndex);
List<Object> listVal = new ArrayList<>();
float estimatedParquetSize = 0;
String listGroupPath = concatDotPath(path, THREE_LEVEL_LIST_GROUP_NAME);
for (Object val : iterableVal) {
ParquetBufferValue parsedValue =
parseColumnValueToParquet(
Expand All @@ -494,14 +490,13 @@ private static ParquetBufferValue get3LevelListValue(
subColumnFinder,
defaultTimezone,
insertRowsCurrIndex,
listGroupPath,
true);
listVal.add(Collections.singletonList(parsedValue.getValue()));
estimatedParquetSize += parsedValue.getSize();
}
if (listVal.isEmpty()) {
subColumnFinder
.getSubColumns(path)
.getSubColumns(type.getId())
.forEach(subColumn -> statsMap.get(subColumn).incCurrentNullCount());
}
return new ParquetBufferValue(listVal, estimatedParquetSize);
Expand All @@ -520,13 +515,12 @@ private static ParquetBufferValue get3LevelMapValue(
Map<String, RowBufferStats> statsMap,
SubColumnFinder subColumnFinder,
ZoneId defaultTimezone,
final long insertRowsCurrIndex,
String path) {
final long insertRowsCurrIndex) {
Map<?, ?> mapVal =
DataValidationUtil.validateAndParseIcebergMap(path, value, insertRowsCurrIndex);
DataValidationUtil.validateAndParseIcebergMap(
subColumnFinder.getDotPath(type.getId()), value, insertRowsCurrIndex);
List<Object> listVal = new ArrayList<>();
float estimatedParquetSize = 0;
String mapGroupPath = concatDotPath(path, THREE_LEVEL_MAP_GROUP_NAME);
for (Map.Entry<?, ?> entry : mapVal.entrySet()) {
ParquetBufferValue parsedKey =
parseColumnValueToParquet(
Expand All @@ -536,7 +530,6 @@ private static ParquetBufferValue get3LevelMapValue(
subColumnFinder,
defaultTimezone,
insertRowsCurrIndex,
mapGroupPath,
true);
ParquetBufferValue parsedValue =
parseColumnValueToParquet(
Expand All @@ -546,14 +539,13 @@ private static ParquetBufferValue get3LevelMapValue(
subColumnFinder,
defaultTimezone,
insertRowsCurrIndex,
mapGroupPath,
true);
listVal.add(Arrays.asList(parsedKey.getValue(), parsedValue.getValue()));
estimatedParquetSize += parsedKey.getSize() + parsedValue.getSize();
}
if (listVal.isEmpty()) {
subColumnFinder
.getSubColumns(path)
.getSubColumns(type.getId())
.forEach(subColumn -> statsMap.get(subColumn).incCurrentNullCount());
}
return new ParquetBufferValue(listVal, estimatedParquetSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ public void setupSchema(List<ColumnMetadata> columns) {
int ordinal = schema.getType(columnDescriptor.getPath()[0]).getId().intValue();

this.statsMap.put(
columnDotPath,
primitiveType.getId().toString(),
new RowBufferStats(
columnDotPath,
null /* collationDefinitionString */,
Expand All @@ -237,7 +237,7 @@ public void setupSchema(List<ColumnMetadata> columns) {
if (onErrorOption == OpenChannelRequest.OnErrorOption.ABORT
|| onErrorOption == OpenChannelRequest.OnErrorOption.SKIP_BATCH) {
this.tempStatsMap.put(
columnDotPath,
primitiveType.getId().toString(),
new RowBufferStats(
columnDotPath,
null /* collationDefinitionString */,
Expand Down Expand Up @@ -364,12 +364,16 @@ private float addRow(
throw new SFException(ErrorCode.INTERNAL_ERROR, "SubColumnFinder is not initialized.");
}

for (String subColumn : subColumnFinder.getSubColumns(columnName)) {
RowBufferStats stats = statsMap.get(subColumn);
for (String subColumnId :
subColumnFinder.getSubColumns(fieldIndex.get(columnName).type.getId())) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fieldIndex.get(columnName) can return null, lets handle that and throw explicitly? OK in your next PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The columnName is from Set.difference(fieldIndex.keySet(), otherSet). I think this should be safe.

RowBufferStats stats = statsMap.get(subColumnId);
if (stats == null) {
throw new SFException(
ErrorCode.INTERNAL_ERROR,
String.format("Column %s not found in stats map.", subColumn));
String.format(
"Entry not found in stats map. fieldId=%s, column=%s.",
subColumnId,
subColumnFinder.getDotPath(new Type.ID(Integer.parseInt(subColumnId)))));
}
stats.incCurrentNullCount();
}
Expand Down
Loading
Loading