-
Notifications
You must be signed in to change notification settings - Fork 58
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Structured data type support #798
Conversation
src/main/java/net/snowflake/ingest/streaming/internal/ParquetTypeGenerator.java
Show resolved
Hide resolved
defaultTimezone, | ||
insertRowsCurrIndex) | ||
: IcebergParquetValueParser.parseColumnValueToParquet( | ||
value, parquetColumn.type, forkedStats, defaultTimezone, insertRowsCurrIndex)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need to pass in columnMetadata / columnMetadata.sourceIcebergDataType?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't use the column metadata as parquetColumn.type
already include all information we need including precision, scale, length, etc.
if (val != null) { | ||
String fieldName = cols.get(i).getPath()[0]; | ||
String fieldName = cols.get(i).getName(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
while this may be functionally equivalent, I'd not mix this into your current PR. Feel free to send a separate PR to replace getPath[0] with getName.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The .getPath()[0]
was used against ColumnDescriptor
. The .getName()
is used on Parquet.Type
as we need the information in Parquet.Type
.
for (Object o : values) { | ||
recordConsumer.startGroup(); | ||
if (o != null) { | ||
write((List<Object>) o, cols.get(i).asGroupType()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should validate recursion level and fail beyond a threshold?
long insertRowsCurrIndex) { | ||
Utils.assertNotNull("Parquet column stats", stats); | ||
float estimatedParquetSize = 0F; | ||
estimatedParquetSize += ParquetBufferValue.DEFINITION_LEVEL_ENCODING_BYTE_LEN; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- checkout the comment on this constant; we are now going to have repetition level so does that change the size estimation logic here?
- Note that FDN tables are ALSO adding support for structured data types; arguably we'll have to add support in this SDK for them too. Our changes need to treat "iceberg data types" and "structured data type support" as separate but related concepts and not tie them up too tightly. That is, keep the door open for FDN data types + structured data type nesting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added repetition level encoding size estimation.
src/main/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParser.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
if (value == null) { | ||
if (!type.isRepetition(Repetition.REQUIRED)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about primitive types, that are non-nullable, and value is still null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic was incorrect, fixed.
c7452e0
to
f020b1f
Compare
type.getName(), value, insertRowsCurrIndex, Integer.class); | ||
} | ||
if (logicalTypeAnnotation instanceof DecimalLogicalTypeAnnotation) { | ||
return getDecimalValue(value, type, insertRowsCurrIndex).unscaledValue().intValue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not use columnMetadata.scale ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also, columnMetadata.precision isn't needed here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, checked documentation we do support custom precision and scale to be specified for managed iceberg tables' decimal fields. See here: https://docs.snowflake.com/en/user-guide/tables-iceberg-data-types
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also applies to long decimal..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The purpose of this function is to parse java type to parquet bytes. The actual bytes of a parquet decimal column is an unscaled integer. The scanner later use the precision and scale in schema to infer the correct information.
The columnMetadata isn't needed bc the Parquet.Type
already include scale and precision, used it here for decimal range validation.
src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java
Outdated
Show resolved
Hide resolved
String columnName, Object input, long insertRowIndex, Class<T> targetType) { | ||
if (input instanceof Number) { | ||
if (targetType.equals(Integer.class)) { | ||
return targetType.cast(((Number) input).intValue()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you check what's the difference between type.class.cast() and Number.intValue / Number.longValue / Number.floatValue / Number.doubleValue ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tye type.class.cast
is used for the return type check and it's no longer needed after we split it into 4 different methods. The Number.xValue
do narrow casting if the source type cannot fit into target type.
src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java
Outdated
Show resolved
Hide resolved
src/main/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParser.java
Show resolved
Hide resolved
type.getName(), value, insertRowsCurrIndex, Long.class); | ||
} | ||
if (logicalTypeAnnotation instanceof DecimalLogicalTypeAnnotation) { | ||
return getDecimalValue(value, type, insertRowsCurrIndex).unscaledValue().longValue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.longValue() here and .intValue() in the previous method are guaranteed to not throw a casting exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According here, longValue
and intValue
returns last 4/8 bytes. I don't think exception will be thrown here.
defaultTimezone, | ||
!includeTimeZone, | ||
insertRowsCurrIndex) | ||
.toBinary(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we are removing timezone in all cases, is that correct behavior?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (logicalTypeAnnotation == null) { | ||
byte[] bytes = | ||
DataValidationUtil.validateAndParseBinary( | ||
type.getName(), value, Optional.empty(), insertRowsCurrIndex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not validate against maxLength here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added max length validation, we don't need the max length in column meta data as the max length of Iceberg.binary(Snowflake.binary(8MB))
and Iceberg.string(Snowflake.varchar(16MB))
are constants.
for (int i = 0; i < type.getFieldCount(); i++) { | ||
ParquetBufferValue parsedValue = | ||
parseColumnValueToParquet( | ||
structVal.getOrDefault(type.getFieldName(i), null), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is null a properly handled value for all types of fields..?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Afaik, the leaf primitive type will always check if it's nullable. Will add test to verify it.
} | ||
if (logicalTypeAnnotation instanceof TimestampLogicalTypeAnnotation) { | ||
boolean includeTimeZone = | ||
((TimestampLogicalTypeAnnotation) logicalTypeAnnotation).isAdjustedToUTC(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see timestamp and timestamptz as two separate types in this document - are we handling both?
https://docs.snowflake.com/en/user-guide/tables-iceberg-data-types
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, Iceberg.timestamp
and Iceberg.timestamptz
use the same LogicalTypeAnnotation
with different adjustToUtc
. Ref
parseColumnValueToParquet( | ||
structVal.getOrDefault(type.getFieldName(i), null), | ||
type.getType(i), | ||
stats, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need the nested object's "metadata" to also be passed in for stuff like precision/scale/byte[] maxlength/etc.
However we won't have a ColumnMetadata object in that situation, which means we now need an abstraction over ColumnMetadata for use in the value parsers (I'm leaving another comment soon that we should look to have one ParquetValueParser class as there's too many similarities for them to be completely independent classes. Especially since we need to support structured data types for FDN types too some day)
Support structured data type for streaming to iceberg. Refer here for all supported data type. Test is expected to add.