Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1465503 Check row count in Parquet footer before committing #784

Merged
merged 4 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.Logging;
Expand Down Expand Up @@ -124,6 +125,8 @@ private SerializationResult serializeFromParquetWriteBuffers(

if (mergedChannelWriter != null) {
mergedChannelWriter.close();
this.verifyRowCounts(
"serializeFromParquetWriteBuffers", mergedChannelWriter, channelsDataPerTable, -1);
}
return new SerializationResult(
channelsMetadataList,
Expand Down Expand Up @@ -216,6 +219,9 @@ private SerializationResult serializeFromJavaObjects(
rows.forEach(parquetWriter::writeRow);
parquetWriter.close();

this.verifyRowCounts(
"serializeFromJavaObjects", parquetWriter, channelsDataPerTable, rows.size());
Copy link
Contributor

@sfc-gh-azagrebin sfc-gh-azagrebin Jul 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be good to use rowCount calculated here outside of verifyRowCounts for the check because this is what we eventually send to GS.


return new SerializationResult(
channelsMetadataList,
columnEpStatsMapCombined,
Expand All @@ -224,4 +230,64 @@ private SerializationResult serializeFromJavaObjects(
mergedData,
chunkMinMaxInsertTimeInMs);
}

/**
* Validates that rows count in metadata matches the row count in Parquet footer and the row count
* written by the parquet writer
*
* @param serializationType Serialization type, used for logging purposes only
* @param writer Parquet writer writing the data
* @param channelsDataPerTable Channel data
* @param javaSerializationTotalRowCount Total row count when java object serialization is used.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by java object serialization is used - looks like above this is just the total row count?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SDK implements two types of row buffering. The default one, which buffers rows as lists of java objects in memory and during flush it iterates over all of them and writes them to Parquet. There is also an alternative implementation called internal buffering implemented by @sfc-gh-azagrebin, which doesn't collect any Java values, but which instead directly serializes rows to Parquet. So, for the default buffering method (java objects), we have 4 sources of row counts that all have to match:

  1. size of the buffered List<List<Object>>
  2. number of rows written by the Parquet writer
  3. number of rows in Parquet footer
  4. number of rows collected for metadata

For the internal buffering optimization, we only have 2nd, 3rd and 4th.

* Used only for logging purposes if there is a mismatch.
*/
private void verifyRowCounts(
String serializationType,
BdecParquetWriter writer,
List<ChannelData<ParquetChunkData>> channelsDataPerTable,
long javaSerializationTotalRowCount) {
long parquetTotalRowsWritten = writer.getRowsWritten();

List<Long> parquetFooterRowsPerBlock = writer.getRowCountsFromFooter();
long parquetTotalRowsInFooter = 0;
for (long perBlockCount : parquetFooterRowsPerBlock) parquetTotalRowsInFooter += perBlockCount;

long totalRowsInMetadata = 0;
for (ChannelData<ParquetChunkData> channelData : channelsDataPerTable)
totalRowsInMetadata += channelData.getRowCount();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC the best practice recommends to always add curly brackets

Suggested change
for (ChannelData<ParquetChunkData> channelData : channelsDataPerTable)
totalRowsInMetadata += channelData.getRowCount();
for (ChannelData<ParquetChunkData> channelData : channelsDataPerTable)
{
totalRowsInMetadata += channelData.getRowCount();
}


if (parquetTotalRowsInFooter != totalRowsInMetadata
|| parquetTotalRowsWritten != totalRowsInMetadata) {

final String perChannelRowCountsInMetadata =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense to dump the whole List<ChannelData<ParquetChunkData>>, if ChannelData/ParquetChunkData get the proper toString methods? because it will be not clear what channel misbehaved.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a log line with channel names. We should not log full channel data because it contains user data.

channelsDataPerTable.stream()
.map(x -> String.valueOf(x.getRowCount()))
.collect(Collectors.joining(","));

final String perBlockRowCountsInFooter =
parquetFooterRowsPerBlock.stream().map(String::valueOf).collect(Collectors.joining(","));

final long channelsCountInMetadata = channelsDataPerTable.size();

throw new SFException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you make sure this is being sent to Snowflake?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by sent to Snowflake? Is it possible via telemetry?

ErrorCode.INTERNAL_ERROR,
String.format(
"[%s]The number of rows in Parquet does not match the number of rows in metadata. "
+ "parquetTotalRowsInFooter=%d "
+ "totalRowsInMetadata=%d "
+ "parquetTotalRowsWritten=%d "
+ "perChannelRowCountsInMetadata=%s "
+ "perBlockRowCountsInFooter=%s "
+ "channelsCountInMetadata=%d "
+ "countOfSerializedJavaObjects=%d",
serializationType,
parquetTotalRowsInFooter,
totalRowsInMetadata,
parquetTotalRowsWritten,
perChannelRowCountsInMetadata,
perBlockRowCountsInFooter,
channelsCountInMetadata,
javaSerializationTotalRowCount));
}
}
}
17 changes: 17 additions & 0 deletions src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import net.snowflake.ingest.utils.Constants;
Expand All @@ -17,6 +18,7 @@
import org.apache.parquet.column.values.factory.DefaultV1ValuesWriterFactory;
import org.apache.parquet.crypto.FileEncryptionProperties;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.io.DelegatingPositionOutputStream;
import org.apache.parquet.io.OutputFile;
import org.apache.parquet.io.ParquetEncodingException;
Expand All @@ -35,6 +37,7 @@
public class BdecParquetWriter implements AutoCloseable {
private final InternalParquetRecordWriter<List<Object>> writer;
private final CodecFactory codecFactory;
private long rowsWritten = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rowsWritten = 0;

also in close?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Closing the writer doesn't clean up the written rows, so se should reset the counter. For example, the Parquet footer is only accessible after closing the writer.


/**
* Creates a BDEC specific parquet writer.
Expand Down Expand Up @@ -100,14 +103,28 @@ public BdecParquetWriter(
encodingProps);
}

/** @return List of row counts per block stored in the parquet footer */
public List<Long> getRowCountsFromFooter() {
final List<Long> blockRowCounts = new ArrayList<>();
for (BlockMetaData metadata : writer.getFooter().getBlocks()) {
blockRowCounts.add(metadata.getRowCount());
}
return blockRowCounts;
}

public void writeRow(List<Object> row) {
try {
writer.write(row);
rowsWritten++;
} catch (InterruptedException | IOException e) {
throw new SFException(ErrorCode.INTERNAL_ERROR, "parquet row write failed", e);
}
}

public long getRowsWritten() {
return rowsWritten;
}

@Override
public void close() throws IOException {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package net.snowflake.ingest.streaming.internal;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.Pair;
import net.snowflake.ingest.utils.SFException;
import org.apache.parquet.hadoop.BdecParquetWriter;
import org.apache.parquet.schema.MessageType;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

public class BlobBuilderTest {

@Test
public void testSerializationErrors() throws Exception {
// Construction succeeds if both data and metadata contain 1 row
BlobBuilder.constructBlobAndMetadata(
"a.bdec",
Collections.singletonList(createChannelDataPerTable(1, false)),
Constants.BdecVersion.THREE);
BlobBuilder.constructBlobAndMetadata(
"a.bdec",
Collections.singletonList(createChannelDataPerTable(1, true)),
Constants.BdecVersion.THREE);

// Construction fails if metadata contains 0 rows and data 1 row
try {
BlobBuilder.constructBlobAndMetadata(
"a.bdec",
Collections.singletonList(createChannelDataPerTable(0, false)),
Constants.BdecVersion.THREE);
Assert.fail("Should not pass enableParquetInternalBuffering=false");
} catch (SFException e) {
Assert.assertEquals(ErrorCode.INTERNAL_ERROR.getMessageCode(), e.getVendorCode());
Assert.assertTrue(e.getMessage().contains("serializeFromJavaObjects"));
Assert.assertTrue(e.getMessage().contains("parquetTotalRowsInFooter=1"));
Assert.assertTrue(e.getMessage().contains("totalRowsInMetadata=0"));
Assert.assertTrue(e.getMessage().contains("parquetTotalRowsWritten=1"));
Assert.assertTrue(e.getMessage().contains("perChannelRowCountsInMetadata=0"));
Assert.assertTrue(e.getMessage().contains("perBlockRowCountsInFooter=1"));
Assert.assertTrue(e.getMessage().contains("channelsCountInMetadata=1"));
Assert.assertTrue(e.getMessage().contains("countOfSerializedJavaObjects=1"));
}

try {
BlobBuilder.constructBlobAndMetadata(
"a.bdec",
Collections.singletonList(createChannelDataPerTable(0, true)),
Constants.BdecVersion.THREE);
Assert.fail("Should not pass enableParquetInternalBuffering=true");
} catch (SFException e) {
Assert.assertEquals(ErrorCode.INTERNAL_ERROR.getMessageCode(), e.getVendorCode());
Assert.assertTrue(e.getMessage().contains("serializeFromParquetWriteBuffers"));
Assert.assertTrue(e.getMessage().contains("parquetTotalRowsInFooter=1"));
Assert.assertTrue(e.getMessage().contains("totalRowsInMetadata=0"));
Assert.assertTrue(e.getMessage().contains("parquetTotalRowsWritten=1"));
Assert.assertTrue(e.getMessage().contains("perChannelRowCountsInMetadata=0"));
Assert.assertTrue(e.getMessage().contains("perBlockRowCountsInFooter=1"));
Assert.assertTrue(e.getMessage().contains("channelsCountInMetadata=1"));
Assert.assertTrue(e.getMessage().contains("countOfSerializedJavaObjects=-1"));
}
}

/**
* Creates a channel data configurable number of rows in metadata and 1 physical row (using both
* with and without internal buffering optimization)
*/
private List<ChannelData<ParquetChunkData>> createChannelDataPerTable(
int metadataRowCount, boolean enableParquetInternalBuffering) throws IOException {
String columnName = "C1";
ChannelData<ParquetChunkData> channelData = Mockito.spy(new ChannelData<>());
MessageType schema = createSchema(columnName);
Mockito.doReturn(
new ParquetFlusher(
schema,
enableParquetInternalBuffering,
100L,
Constants.BdecParquetCompression.GZIP))
.when(channelData)
.createFlusher();

channelData.setRowSequencer(1L);
ByteArrayOutputStream stream = new ByteArrayOutputStream();
BdecParquetWriter bdecParquetWriter =
new BdecParquetWriter(
stream,
schema,
new HashMap<>(),
"CHANNEL",
1000,
Constants.BdecParquetCompression.GZIP);
bdecParquetWriter.writeRow(Collections.singletonList("1"));
channelData.setVectors(
new ParquetChunkData(
Collections.singletonList(Collections.singletonList("A")),
bdecParquetWriter,
stream,
new HashMap<>()));
channelData.setColumnEps(new HashMap<>());
channelData.setRowCount(metadataRowCount);
channelData.setMinMaxInsertTimeInMs(new Pair<>(2L, 3L));

channelData.getColumnEps().putIfAbsent(columnName, new RowBufferStats(columnName, null, 1));
channelData.setChannelContext(
new ChannelFlushContext("channel1", "DB", "SCHEMA", "TABLE", 1L, "enc", 1L));
return Collections.singletonList(channelData);
}

private static MessageType createSchema(String columnName) {
ParquetTypeGenerator.ParquetTypeInfo c1 =
ParquetTypeGenerator.generateColumnParquetTypeInfo(createTestTextColumn(columnName), 1);
return new MessageType("bdec", Collections.singletonList(c1.getParquetType()));
}

private static ColumnMetadata createTestTextColumn(String name) {
ColumnMetadata colChar = new ColumnMetadata();
colChar.setOrdinal(1);
colChar.setName(name);
colChar.setPhysicalType("LOB");
colChar.setNullable(true);
colChar.setLogicalType("TEXT");
colChar.setByteLength(14);
colChar.setLength(11);
colChar.setScale(0);
return colChar;
}
}
Loading