-
Notifications
You must be signed in to change notification settings - Fork 57
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SNOW-1727532 Set number of values for repeated fields #861
Conversation
8785341
to
28c8435
Compare
28c8435
to
19d66e4
Compare
ae10eee
to
9c77f1b
Compare
9c77f1b
to
4d9c70b
Compare
src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java
Outdated
Show resolved
Hide resolved
src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java
Outdated
Show resolved
Hide resolved
src/main/java/net/snowflake/ingest/streaming/internal/FileColumnProperties.java
Show resolved
Hide resolved
src/main/java/net/snowflake/ingest/streaming/internal/FileColumnProperties.java
Outdated
Show resolved
Hide resolved
this.maxChunkSizeInBytes = maxChunkSizeInBytes; | ||
this.maxAllowedRowSizeInBytes = maxAllowedRowSizeInBytes; | ||
this.bdecParquetCompression = bdecParquetCompression; | ||
this.enableNewJsonParsingLogic = enableNewJsonParsingLogic; | ||
this.maxRowGroups = maxRowGroups; | ||
this.isIcebergMode = isIcebergMode; | ||
this.enableDistinctValuesCount = enableDistinctValuesCount; | ||
this.enableValuesCount = enableValuesCount; | ||
} | ||
|
||
/** @param clientInternal reference to the client object where the relevant parameters are set */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've never understood why clientInternal can or should be allowed to be null. Looks like a testcase author leaking their convenience needs into production code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The clientInterTest need this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's another ctor for testcases to pass in whatever booleans they want, so still unclear why there's a need to pass in a null clientInternal to this ctor.
src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java
Outdated
Show resolved
Hide resolved
src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java
Outdated
Show resolved
Hide resolved
final int magicOffset = bytes.length - ParquetFileWriter.MAGIC.length; | ||
final int footerSizeOffset = magicOffset - Integer.BYTES; | ||
if (footerSizeOffset < 0 | ||
|| !ParquetFileWriter.MAGIC_STR.equals( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
curious why is this MAGIC_STR comparison being added only now / why is it needed only when calculating ext metadta size?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressing this comment
src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java
Outdated
Show resolved
Hide resolved
src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java
Outdated
Show resolved
Hide resolved
@@ -427,7 +432,7 @@ Set<String> verifyInputColumns( | |||
List<String> missingCols = new ArrayList<>(); | |||
for (String columnName : this.nonNullableFieldNames) { | |||
if (!inputColNamesMap.containsKey(columnName)) { | |||
missingCols.add(statsMap.get(columnName).getColumnDisplayName()); | |||
missingCols.add(fieldIndex.get(columnName).columnMetadata.getName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we remove the getColumnDisplayName() method from the RowBufferStats class now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually, is there a need to even make this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a root column is a structured data type, statsMap.get(columnName)
will trow NPE as we only stores leaf column stats. Use fieldIndex
instead for logging to avoid this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add this as a code comment.
src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java
Show resolved
Hide resolved
@@ -49,6 +48,7 @@ public class ParquetRowBuffer extends AbstractRowBuffer<ParquetChunkData> { | |||
private final ParquetProperties.WriterVersion parquetWriterVersion; | |||
|
|||
private MessageType schema; | |||
private SubColumnFinder statsFinder; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to subColumnFinder
src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java
Show resolved
Hide resolved
src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java
Outdated
Show resolved
Hide resolved
src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java
Outdated
Show resolved
Hide resolved
src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java
Outdated
Show resolved
Hide resolved
statsMap.get(columnName).incCurrentNullCount(); | ||
statsFinder | ||
.getSubColumns(columnName) | ||
.forEach(subColumn -> statsMap.get(subColumn).incCurrentNullCount()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nullref if statsMap.get(subColumn) returns null.
can that happen?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No as both subColumnFinder
and statsMap
are built from the same schema. Added check here in case.
* @return the footer size | ||
*/ | ||
public static long getParquetFooterSize(byte[] bytes) throws IOException { | ||
final int magicOffset = bytes.length - ParquetFileWriter.MAGIC.length; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
curious, does this handle both PARE and PAR1 ?
I guess they're both 4 bytes so the length won't change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes they have the same length. Added check for PARE
.
new String(bytes, magicOffset, ParquetFileWriter.MAGIC.length))); | ||
} | ||
|
||
return BytesUtils.readIntLittleEndian(bytes, footerSizeOffset); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it always little endian irrespective of parquet V1 and parquet V2? How can we add a test that ensures this method is correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the V1 and V2 only affects data page. (ref). For testing, I think we can use serializedBytes.length - writer.getColumnIndexReference().getOffset() - metadataSize - ParquetFileWriter.MAGIC.length - footerSizeBytes(4) = extenedMetadataSize
. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added test for parquet footer size & extendedMetadataSize in BlobBuilderTest
src/main/java/net/snowflake/ingest/streaming/internal/FileColumnProperties.java
Show resolved
Hide resolved
src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java
Outdated
Show resolved
Hide resolved
src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java
Outdated
Show resolved
Hide resolved
src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java
Outdated
Show resolved
Hide resolved
@@ -313,9 +352,27 @@ private float addRow( | |||
RowBufferStats.getCombinedStats(statsMap.get(columnName), forkedColStats.getValue())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh wow this looks like a thread unsafe operation here. RowBuffer.statsMap and tempStatsMap are both constructed as non-concurrent collections, but we don't prohibit concurrent insertRows on one channel!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(nothig to fix for now we'll followup on this separately)
src/main/java/net/snowflake/ingest/streaming/internal/RowBufferStats.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
This PR includes following change:
numberOfValues
for repeated fields (e.g. map, list) and used it in EP file registration.extendedMetadataSize
andmetadataSize
fields for EP registration.