-
Notifications
You must be signed in to change notification settings - Fork 57
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SNOW-1787322 Fix InsertError for structured data type #888
Conversation
* | ||
* @param missingNotNullColName the missing non-nullable column name | ||
*/ | ||
public void addMissingNotNullColName(String missingNotNullColName) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any chance this can be called from multiple threads?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Insert error instance are independent between rows, I think this should be safe.
src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java
Outdated
Show resolved
Hide resolved
src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java
Outdated
Show resolved
Hide resolved
src/main/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParser.java
Outdated
Show resolved
Hide resolved
src/main/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParser.java
Outdated
Show resolved
Hide resolved
src/main/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParser.java
Show resolved
Hide resolved
error); | ||
listVal.add(parsedValue.getValue()); | ||
estimatedParquetSize += parsedValue.getSize(); | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this else block is (a) not incrementing estimatedParquetSize, and (b) not adding to listVal in one branch (when required=true). Unclear why you really need this else block?
You can also do the following I think and avoid the unnecessary if-else branching?
String fieldName = type.getFieldName(i);
Object val = structVal.getOrDefault(fieldName, null);
ParquetBufferValue parsedValue = parseColumnValueToParquet(val, ....);
listVal.add(parsedValue.getValue());
estParquetSize = ...;
if (type.getType(i).isRepetition(REQUIRED)) {
missingFields.add(fieldName);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason to add this else is to distinguish between missing column and column with null value. That is, we cannot use getOrDefault(fieldName, null)
as this ruined the difference between null value and missing key.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, got it. please add a comment around this.
src/main/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParser.java
Outdated
Show resolved
Hide resolved
src/main/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParser.java
Outdated
Show resolved
Hide resolved
src/main/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParser.java
Outdated
Show resolved
Hide resolved
d939c5a
to
af46e60
Compare
8924513
to
ba6cdba
Compare
} | ||
if (sb.length() > 0) { | ||
sb.append("."); | ||
} | ||
sb.append(p); | ||
sb.append(p.replace("\\", "\\\\").replace(".", "\\.")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does an empty string in p need to also be replaced by something? Have IT for an empty field name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think empty string does not need to be replaced. An empty string does not collision with other dot path. An IT with empty fields was included in previous PR in IcebergStructuredIT.testFieldName
.
ba6cdba
to
960d35d
Compare
@@ -85,7 +85,7 @@ public void parseValueInt() { | |||
}; | |||
ParquetBufferValue pv = | |||
IcebergParquetValueParser.parseColumnValueToParquet( | |||
Integer.MAX_VALUE, type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); | |||
Integer.MAX_VALUE, type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0, null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for not adding a method overload :)
"object(k1 int not null, k2 object(k3 int not null, k4 object(k5 int not null) not" | ||
+ " null) not null) not null"); | ||
SnowflakeStreamingIngestChannel channel = | ||
openChannel(tableName, OpenChannelRequest.OnErrorOption.ABORT); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like a pre-existing gap where someone using OnErrorOption.ABORT will not get the same set of information (insertErrors object) as OnErrorOption.CONTINUE ? (unrelated to your PR)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For single row, errors on different OnErrorOption should be the same.
channel.insertRow(createStreamingIngestRow(row), UUID.randomUUID().toString()); | ||
assertThat(insertValidationResponse.getInsertErrors().size()).isEqualTo(1); | ||
assertThat(insertValidationResponse.getInsertErrors().get(0).getExtraColNames()) | ||
.containsOnly("VALUE.k2", "VALUE.k\\.3", "VALUE.k\\\\4"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FOR KC team: The first VALUE is the default column name used by the test harness, and doesn't have any semantics. This is unlike the key-value/list/element/value words that you'll see in the examples that follow.
Signing off but please hold off on merging until we get a positive ack from warsaw on this change being sufficient, slack conversation ongoing. |
960d35d
to
f622569
Compare
Currently the
InsertError
doesn't populate extra columns, missing columns, null value for non null columns inInsertValidationResponse
when ingesting structured data type to Iceberg tables. The PR is fixing this.We use parquet dot path with escaping dot and back slash character in field name to represent sub-columns. For example, column
x
inmap_col(string, object("a.b" array(object("c\d" object(x int)))))
has a pathMAP_COL.key_value.value.a\.b.list.element.c\\d.x