-
Notifications
You must be signed in to change notification settings - Fork 57
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SNOW-1507007 Support schema for new table format #814
Conversation
@@ -61,6 +61,8 @@ | |||
"org.bouncycastle:bcpkix-jdk18on": BOUNCY_CASTLE_LICENSE, | |||
"org.bouncycastle:bcutil-jdk18on": BOUNCY_CASTLE_LICENSE, | |||
"org.bouncycastle:bcprov-jdk18on": BOUNCY_CASTLE_LICENSE, | |||
"org.roaringbitmap:RoaringBitmap": APACHE_LICENSE, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't find this library being used anywhere, why is this change needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a transitive dependency from org.apache.iceberg:iceberg-core
.
[INFO] +- org.apache.iceberg:iceberg-core:jar:1.3.1:compile
[INFO] | +- org.apache.iceberg:iceberg-common:jar:1.3.1:runtime
[INFO] | +- org.apache.avro:avro:jar:1.11.1:runtime
[INFO] | +- org.apache.httpcomponents.client5:httpclient5:jar:5.2.1:runtime
[INFO] | | +- org.apache.httpcomponents.core5:httpcore5:jar:5.2:runtime
[INFO] | | \- org.apache.httpcomponents.core5:httpcore5-h2:jar:5.2:runtime
[INFO] | \- org.roaringbitmap:RoaringBitmap:jar:0.9.44:runtime
[INFO] | \- org.roaringbitmap:shims:jar:0.9.44:runtime
src/main/java/net/snowflake/ingest/streaming/internal/ColumnMetadata.java
Show resolved
Hide resolved
src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelRequestInternal.java
Show resolved
Hide resolved
src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java
Outdated
Show resolved
Hide resolved
src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java
Outdated
Show resolved
Hide resolved
parquetTypes.add(typeInfo.getParquetType()); | ||
this.metadata.putAll(typeInfo.getMetadata()); | ||
int columnIndex = parquetTypes.size() - 1; | ||
fieldIndex.put( | ||
column.getInternalName(), | ||
new ParquetColumn(column, columnIndex, typeInfo.getPrimitiveTypeName())); | ||
new ParquetColumn(column, columnIndex, typeInfo.getParquetType())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where do we validate that only primitive typed typeInfos are allowed for FDN tables?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The FDN type -> parquetType function generateColumnParquetTypeInfo
only returns primitive type. Should we add assertion here?
src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java
Outdated
Show resolved
Hide resolved
src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java
Outdated
Show resolved
Hide resolved
src/main/java/net/snowflake/ingest/streaming/internal/ParquetTypeInfo.java
Outdated
Show resolved
Hide resolved
org.apache.iceberg.types.Type icebergDataType = | ||
deserializeIcebergType(column.getSourceIcebergDataType()); | ||
parquetType = | ||
typeToMessageType.primitive(icebergDataType.asPrimitiveType(), repetition, id, name); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need error handling here for when a new iceberg data type is returned by service that the client doesn't know how to handle
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added check in IcebergDataTypeParser. If a new iceberg data type is introduced in server, Types.fromPrimitive
will throw IllegalArgumentException
, do you think this is sufficient?
src/main/java/net/snowflake/ingest/streaming/internal/ParquetTypeGenerator.java
Outdated
Show resolved
Hide resolved
src/main/java/net/snowflake/ingest/streaming/internal/ParquetTypeGenerator.java
Outdated
Show resolved
Hide resolved
src/main/java/net/snowflake/ingest/streaming/internal/ParquetTypeGenerator.java
Show resolved
Hide resolved
* GlobalServices/modules/data-lake/datalake-api/src/main/java/com/snowflake/metadata/iceberg | ||
* /IcebergDataTypeParser.java | ||
*/ | ||
public class IcebergDataTypeParser { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class is in utils package. Remain public since we need to use it from internal package.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for next PR: This will allow customers to directly instantiate this class because its public, lets discuss.
JsonNode json = MAPPER.readTree(icebergDataType); | ||
return getTypeFromJson(json); | ||
} catch (IOException e) { | ||
throw new SFException(ErrorCode.INTERNAL_ERROR, "Failed to deserialize Iceberg data type", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see in FDN case we throw UNKNOWN_DATA_TYPE ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This exception is thrown cause json parse error, imo this is an internal error as this should not happen if the server side is passing valid json string.
src/main/java/net/snowflake/ingest/utils/IcebergDataTypeParser.java
Outdated
Show resolved
Hide resolved
} | ||
} | ||
|
||
throw new SFException(ErrorCode.INTERNAL_ERROR, "Cannot parse Iceberg type from: " + jsonNode); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
instead of reserializing jsonNode I'd pass in the original string and write that out to logs / to the error message.
-
(P2) It might be a good idea to only throw SFExceptions from "higher" layers and not from utility parsing classes like this one, IllegalArgumentException probably is good enough here; lets chat with Toby on whether there's already a defined scheme to which all levels can/should throw SFException.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed all exception to IllegalArgurmentException
in this class.
19f7b04
to
8e34548
Compare
04998ae
to
ff8f766
Compare
@sfc-gh-azagrebin @sfc-gh-lsembera could you help with reviewing Parquet writer and data type related changes in this PR? Thanks! |
*/ | ||
static long validateAndParseIcebergLong(String columnName, Object input, long insertRowIndex) { | ||
if (input instanceof Number) { | ||
double value = ((Number) input).doubleValue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a narrowing conversion, which loses precision, see:
long l1 = 499999999000000001L;
double d = l1;
long l2 = (long) d;
System.out.println(l1);
System.out.println(d);
System.out.println(l2);
yields:
499999999000000001
4.99999999E17
499999999000000000
It is better to use BigDecimal
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Switched to BigDecimal
.
src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java
Outdated
Show resolved
Hide resolved
src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java
Outdated
Show resolved
Hide resolved
src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java
Outdated
Show resolved
Hide resolved
* @param insertRowIndex Row index for error reporting | ||
* @return Parsed integer | ||
*/ | ||
static int validateAndParseIcebergInt(String columnName, Object input, long insertRowIndex) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add unit tests for these functions, which focus on corner cases? Min and max values, Double/Float NaN+positive/negative infinity, big integers and big decimals outside of the allowed range, etc.?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added tests here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reviewed the int/long validation in DataValidationUtil
and it LGTM, left two small comments for your consideration.
For BDEC ingestion, we have a suite of integration tests testing data types end to end, i.e. including server-side scanning. I will leave it up to you if you think you need it for Parquet ingestion, as well.
public void testValidateAndParseIcebergLong() { | ||
assertEquals(1L, validateAndParseIcebergLong("COL", 1, 0)); | ||
assertEquals(1L, validateAndParseIcebergLong("COL", 1L, 0)); | ||
assertEquals(1L, validateAndParseIcebergLong("COL", 1.499f, 0)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add a test for negative zero, as well. I believe it should be converted to positive zero.
cc @sfc-gh-azagrebin who was dealing with this issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added test for -.0f
. IT for all Iceberg types are expect to be added when server side Iceberg file registration is ready, backlogged to jira.
src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java
Outdated
Show resolved
Hide resolved
src/main/java/net/snowflake/ingest/streaming/internal/OpenChannelRequestInternal.java
Show resolved
Hide resolved
if (isIcebergMode | ||
&& response.getTableColumns().stream() | ||
.anyMatch(c -> c.getSourceIcebergDataType() == null)) { | ||
throw new SFException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for next PR: also log this out before throwing, and log the request id too.
default: | ||
if (column.getSourceIcebergDataType() != null) { | ||
parquetType = | ||
IcebergDataTypeParser.parseIcebergDataTypeStringToParquetType( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for next PR: add testcase and throw proper error if an unknown data type is encountered
src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java
Show resolved
Hide resolved
@@ -261,6 +263,10 @@ Object getVectorValueAt(String column, int index) { | |||
if (logicalType == ColumnLogicalType.BINARY && value != null) { | |||
value = value instanceof String ? ((String) value).getBytes(StandardCharsets.UTF_8) : value; | |||
} | |||
/* Mismatch between Iceberg string & FDN String */ | |||
if (Objects.equals(columnMetadata.getSourceIcebergDataType(), "\"string\"")) { | |||
value = value instanceof byte[] ? new String((byte[]) value, StandardCharsets.UTF_8) : value; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is test-only code so np, but otherwise assuming the byte[] is UTF8 is a bit risky and shouldn't be done.
Types.primitive(PrimitiveType.PrimitiveTypeName.DOUBLE, repetition).id(id).named(name); | ||
break; | ||
default: | ||
if (column.getSourceIcebergDataType() != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for next PR: add validation that we only ever see column.sourceIcebergDataType as non-null when isIcebergMode is true. I saw you're already checking if this field is null on any column when we're in iceberg mode, need similar check for non-iceberg-mode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed all files except the following (verified they're properly protected behind isIceberg flag, will get to them again later this week), signing off to unblock checkin.
IcebergParquetValueParser.java
IcebergDataTypeParser.java
DataValidationUtil.java
testcases
dataTypesToTest.add(new DataTypeInfo("\"date\"", Types.DateType.get())); | ||
dataTypesToTest.add(new DataTypeInfo("\"time\"", Types.TimeType.get())); | ||
dataTypesToTest.add(new DataTypeInfo("\"timestamptz\"", Types.TimestampType.withZone())); | ||
dataTypesToTest.add( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the structured data type testing needs to cover more cases; and also account for nested schema parsing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added to jira.
if (val != null) { | ||
String fieldName = cols.get(i).getPath()[0]; | ||
String fieldName = cols.get(i).getName(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
whats the difference between getPath()[0] and getName() ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The getPath()[0]
always returns the root column name. While getName()
returns the current column/subcolumn name. This does matters for primitive data type column. Is needed for structured data type.
We use FDN-specific logical and physical data types only today. In this PR we change to use iceberg’s data types so there is no loss of signal between the table schema on the server versus what data type conversions are done in the client.