Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1465503 Check row count in Parquet footer before committing #784

Merged
merged 4 commits into from
Jul 8, 2024

Conversation

sfc-gh-lsembera
Copy link
Contributor

@sfc-gh-lsembera sfc-gh-lsembera commented Jun 25, 2024

This PR implements an additional safety check if the number of rows in the Parquet footer matches the number of metadata rows we've collected. If not, an internal exception is thrown.

@sfc-gh-lsembera sfc-gh-lsembera force-pushed the lsembera/row-count-check branch from 02de7dc to 0d7d47c Compare June 26, 2024 11:41
@sfc-gh-lsembera sfc-gh-lsembera marked this pull request as ready for review June 26, 2024 11:48
@sfc-gh-lsembera sfc-gh-lsembera requested review from sfc-gh-tzhang and a team as code owners June 26, 2024 11:48

// We check if the number of rows collectively written for channels encountered so far matches
// the number of rows in metadata
if (mergedChannelWriter.getRowsWritten() != rowCount) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be potentially racy i.e. other threads writing data via the writer concurrently? Or is the mergedChannelWriter only used from this current thread?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cannot be racy, serialization happens in a single BG thread and no other thread can modify the buffer anymore.

@@ -35,6 +36,7 @@
public class BdecParquetWriter implements AutoCloseable {
private final InternalParquetRecordWriter<List<Object>> writer;
private final CodecFactory codecFactory;
private final AtomicLong rowsWritten = new AtomicLong(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can it be just a volatile? This goes back to my previous question about races. If there is no concurrent access (but non-overlapping access from different threads), then volatile would be sufficient.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, in this case, we don't even need volatile. The field is accessed either within a lock, or in a BG thread with happens-before relationship with the spawning thread.

Comment on lines 163 to 171
/**
* Safety check to verify whether the number of rows in the parquet footer matches the number of
* rows in metadata
*/
static <T> void verifyRowCounts(
Flusher.SerializationResult serializationResult,
List<ChannelData<T>> channelsDataPerTable,
byte[] paddedChunkData,
int chunkLength) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another idea would be to add the check in the Parquet flusher inside serializeFromX methods right before we return the serialization result. We can get the per block row counts from the parquetWriter (through a new method) directly since the writer has access to the footer. So we wouldn't need to read and create a new BdecInputFile.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, this simplifies the code pretty significantly. I updated the PR.

@sfc-gh-lsembera sfc-gh-lsembera force-pushed the lsembera/row-count-check branch 2 times, most recently from 75e6f66 to c4ae63c Compare June 28, 2024 20:44
* @param serializationType Serialization type, used for logging purposes only
* @param writer Parquet writer writing the data
* @param channelsDataPerTable Channel data
* @param javaSerializationTotalRowCount Total row count when java object serialization is used.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by java object serialization is used - looks like above this is just the total row count?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SDK implements two types of row buffering. The default one, which buffers rows as lists of java objects in memory and during flush it iterates over all of them and writes them to Parquet. There is also an alternative implementation called internal buffering implemented by @sfc-gh-azagrebin, which doesn't collect any Java values, but which instead directly serializes rows to Parquet. So, for the default buffering method (java objects), we have 4 sources of row counts that all have to match:

  1. size of the buffered List<List<Object>>
  2. number of rows written by the Parquet writer
  3. number of rows in Parquet footer
  4. number of rows collected for metadata

For the internal buffering optimization, we only have 2nd, 3rd and 4th.

Copy link
Collaborator

@sfc-gh-gdoci sfc-gh-gdoci left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, lgtm.

@sfc-gh-lsembera sfc-gh-lsembera force-pushed the lsembera/row-count-check branch from 1b240e6 to d39ca44 Compare July 1, 2024 10:32
Copy link
Contributor

@sfc-gh-azagrebin sfc-gh-azagrebin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Lukas!

if (parquetTotalRowsInFooter != totalRowsInMetadata
|| parquetTotalRowsWritten != totalRowsInMetadata) {

final String perChannelRowCountsInMetadata =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense to dump the whole List<ChannelData<ParquetChunkData>>, if ChannelData/ParquetChunkData get the proper toString methods? because it will be not clear what channel misbehaved.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a log line with channel names. We should not log full channel data because it contains user data.

@@ -35,6 +37,7 @@
public class BdecParquetWriter implements AutoCloseable {
private final InternalParquetRecordWriter<List<Object>> writer;
private final CodecFactory codecFactory;
private long rowsWritten = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rowsWritten = 0;

also in close?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Closing the writer doesn't clean up the written rows, so se should reset the counter. For example, the Parquet footer is only accessible after closing the writer.

@@ -216,6 +219,9 @@ private SerializationResult serializeFromJavaObjects(
rows.forEach(parquetWriter::writeRow);
parquetWriter.close();

this.verifyRowCounts(
"serializeFromJavaObjects", parquetWriter, channelsDataPerTable, rows.size());
Copy link
Contributor

@sfc-gh-azagrebin sfc-gh-azagrebin Jul 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be good to use rowCount calculated here outside of verifyRowCounts for the check because this is what we eventually send to GS.

Copy link
Contributor

@sfc-gh-tzhang sfc-gh-tzhang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, please make sure the error is being sent to SF to avoid the case where customer doesn't have the log, thanks!


final long channelsCountInMetadata = channelsDataPerTable.size();

throw new SFException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you make sure this is being sent to Snowflake?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by sent to Snowflake? Is it possible via telemetry?

Comment on lines 256 to 257
for (ChannelData<ParquetChunkData> channelData : channelsDataPerTable)
totalRowsInMetadata += channelData.getRowCount();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC the best practice recommends to always add curly brackets

Suggested change
for (ChannelData<ParquetChunkData> channelData : channelsDataPerTable)
totalRowsInMetadata += channelData.getRowCount();
for (ChannelData<ParquetChunkData> channelData : channelsDataPerTable)
{
totalRowsInMetadata += channelData.getRowCount();
}

@sfc-gh-lsembera sfc-gh-lsembera merged commit 9254771 into master Jul 8, 2024
48 checks passed
@sfc-gh-lsembera sfc-gh-lsembera deleted the lsembera/row-count-check branch July 8, 2024 09:20
sfc-gh-kgaputis pushed a commit to sfc-gh-kgaputis/snowflake-ingest-java that referenced this pull request Sep 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants