Skip to content

Commit

Permalink
SNOW-1659373 cleanup serializeFromParquetBuffers (#829)
Browse files Browse the repository at this point in the history
SNOW-1659373 remove serializeFromParquetBuffers
  • Loading branch information
sfc-gh-gdoci authored Sep 16, 2024
1 parent c063036 commit 3cf3eb5
Show file tree
Hide file tree
Showing 7 changed files with 19 additions and 262 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
/** Channel's buffer relevant parameters that are set at the owning client level. */
public class ClientBufferParameters {

private boolean enableParquetInternalBuffering;

private long maxChunkSizeInBytes;

private long maxAllowedRowSizeInBytes;
Expand All @@ -23,18 +21,14 @@ public class ClientBufferParameters {
/**
* Private constructor used for test methods
*
* @param enableParquetInternalBuffering flag whether buffering in internal Parquet buffers is
* enabled
* @param maxChunkSizeInBytes maximum chunk size in bytes
* @param maxAllowedRowSizeInBytes maximum row size in bytes
*/
private ClientBufferParameters(
boolean enableParquetInternalBuffering,
long maxChunkSizeInBytes,
long maxAllowedRowSizeInBytes,
Constants.BdecParquetCompression bdecParquetCompression,
boolean enableNewJsonParsingLogic) {
this.enableParquetInternalBuffering = enableParquetInternalBuffering;
this.maxChunkSizeInBytes = maxChunkSizeInBytes;
this.maxAllowedRowSizeInBytes = maxAllowedRowSizeInBytes;
this.bdecParquetCompression = bdecParquetCompression;
Expand All @@ -43,10 +37,6 @@ private ClientBufferParameters(

/** @param clientInternal reference to the client object where the relevant parameters are set */
public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInternal) {
this.enableParquetInternalBuffering =
clientInternal != null
? clientInternal.getParameterProvider().getEnableParquetInternalBuffering()
: ParameterProvider.ENABLE_PARQUET_INTERNAL_BUFFERING_DEFAULT;
this.maxChunkSizeInBytes =
clientInternal != null
? clientInternal.getParameterProvider().getMaxChunkSizeInBytes()
Expand All @@ -67,30 +57,22 @@ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInter
}

/**
* @param enableParquetInternalBuffering flag whether buffering in internal Parquet buffers is
* enabled
* @param maxChunkSizeInBytes maximum chunk size in bytes
* @param maxAllowedRowSizeInBytes maximum row size in bytes
* @return ClientBufferParameters object
*/
public static ClientBufferParameters test_createClientBufferParameters(
boolean enableParquetInternalBuffering,
long maxChunkSizeInBytes,
long maxAllowedRowSizeInBytes,
Constants.BdecParquetCompression bdecParquetCompression,
boolean enableNewJsonParsingLogic) {
return new ClientBufferParameters(
enableParquetInternalBuffering,
maxChunkSizeInBytes,
maxAllowedRowSizeInBytes,
bdecParquetCompression,
enableNewJsonParsingLogic);
}

public boolean getEnableParquetInternalBuffering() {
return enableParquetInternalBuffering;
}

public long getMaxChunkSizeInBytes() {
return maxChunkSizeInBytes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,37 +4,24 @@

package net.snowflake.ingest.streaming.internal;

import java.io.ByteArrayOutputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.parquet.hadoop.BdecParquetWriter;

/** Parquet data holder to buffer rows. */
public class ParquetChunkData {
// buffered rows serialized into Java objects. Needed for the Parquet w/o memory optimization.
final List<List<Object>> rows;

final BdecParquetWriter parquetWriter;
final ByteArrayOutputStream output;
final Map<String, String> metadata;

/**
* Construct parquet data chunk.
*
* @param rows buffered row data as a list
* @param parquetWriter buffered parquet row data
* @param output byte array file output
* @param metadata chunk metadata
*/
public ParquetChunkData(
List<List<Object>> rows,
BdecParquetWriter parquetWriter,
ByteArrayOutputStream output,
Map<String, String> metadata) {
public ParquetChunkData(List<List<Object>> rows, Map<String, String> metadata) {
this.rows = rows;
this.parquetWriter = parquetWriter;
this.output = output;
// create a defensive copy of the parameter map because the argument map passed here
// may currently be shared across multiple threads.
this.metadata = createDefensiveCopy(metadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import net.snowflake.ingest.utils.Logging;
import net.snowflake.ingest.utils.Pair;
import net.snowflake.ingest.utils.SFException;
import org.apache.parquet.hadoop.BdecParquetReader;
import org.apache.parquet.hadoop.BdecParquetWriter;
import org.apache.parquet.schema.MessageType;

Expand All @@ -26,22 +25,16 @@
public class ParquetFlusher implements Flusher<ParquetChunkData> {
private static final Logging logger = new Logging(ParquetFlusher.class);
private final MessageType schema;
private final boolean enableParquetInternalBuffering;
private final long maxChunkSizeInBytes;

private final Constants.BdecParquetCompression bdecParquetCompression;

/**
* Construct parquet flusher from its schema and set flag that indicates whether Parquet memory
* optimization is enabled, i.e. rows will be buffered in internal Parquet buffer.
*/
/** Construct parquet flusher from its schema. */
public ParquetFlusher(
MessageType schema,
boolean enableParquetInternalBuffering,
long maxChunkSizeInBytes,
Constants.BdecParquetCompression bdecParquetCompression) {
this.schema = schema;
this.enableParquetInternalBuffering = enableParquetInternalBuffering;
this.maxChunkSizeInBytes = maxChunkSizeInBytes;
this.bdecParquetCompression = bdecParquetCompression;
}
Expand All @@ -50,97 +43,9 @@ public ParquetFlusher(
public SerializationResult serialize(
List<ChannelData<ParquetChunkData>> channelsDataPerTable, String filePath)
throws IOException {
if (enableParquetInternalBuffering) {
return serializeFromParquetWriteBuffers(channelsDataPerTable, filePath);
}
return serializeFromJavaObjects(channelsDataPerTable, filePath);
}

private SerializationResult serializeFromParquetWriteBuffers(
List<ChannelData<ParquetChunkData>> channelsDataPerTable, String filePath)
throws IOException {
List<ChannelMetadata> channelsMetadataList = new ArrayList<>();
long rowCount = 0L;
float chunkEstimatedUncompressedSize = 0f;
String firstChannelFullyQualifiedTableName = null;
Map<String, RowBufferStats> columnEpStatsMapCombined = null;
BdecParquetWriter mergedChannelWriter = null;
ByteArrayOutputStream mergedChunkData = new ByteArrayOutputStream();
Pair<Long, Long> chunkMinMaxInsertTimeInMs = null;

for (ChannelData<ParquetChunkData> data : channelsDataPerTable) {
// Create channel metadata
ChannelMetadata channelMetadata =
ChannelMetadata.builder()
.setOwningChannelFromContext(data.getChannelContext())
.setRowSequencer(data.getRowSequencer())
.setOffsetToken(data.getEndOffsetToken())
.setStartOffsetToken(data.getStartOffsetToken())
.build();
// Add channel metadata to the metadata list
channelsMetadataList.add(channelMetadata);

logger.logDebug(
"Parquet Flusher: Start building channel={}, rowCount={}, bufferSize={} in blob={}",
data.getChannelContext().getFullyQualifiedName(),
data.getRowCount(),
data.getBufferSize(),
filePath);

if (mergedChannelWriter == null) {
columnEpStatsMapCombined = data.getColumnEps();
mergedChannelWriter = data.getVectors().parquetWriter;
mergedChunkData = data.getVectors().output;
firstChannelFullyQualifiedTableName = data.getChannelContext().getFullyQualifiedTableName();
chunkMinMaxInsertTimeInMs = data.getMinMaxInsertTimeInMs();
} else {
// This method assumes that channelsDataPerTable is grouped by table. We double check
// here and throw an error if the assumption is violated
if (!data.getChannelContext()
.getFullyQualifiedTableName()
.equals(firstChannelFullyQualifiedTableName)) {
throw new SFException(ErrorCode.INVALID_DATA_IN_CHUNK);
}

columnEpStatsMapCombined =
ChannelData.getCombinedColumnStatsMap(columnEpStatsMapCombined, data.getColumnEps());
data.getVectors().parquetWriter.close();
BdecParquetReader.readFileIntoWriter(
data.getVectors().output.toByteArray(), mergedChannelWriter);
chunkMinMaxInsertTimeInMs =
ChannelData.getCombinedMinMaxInsertTimeInMs(
chunkMinMaxInsertTimeInMs, data.getMinMaxInsertTimeInMs());
}

rowCount += data.getRowCount();
chunkEstimatedUncompressedSize += data.getBufferSize();

logger.logDebug(
"Parquet Flusher: Finish building channel={}, rowCount={}, bufferSize={} in blob={}",
data.getChannelContext().getFullyQualifiedName(),
data.getRowCount(),
data.getBufferSize(),
filePath);
}

if (mergedChannelWriter != null) {
mergedChannelWriter.close();
this.verifyRowCounts(
"serializeFromParquetWriteBuffers",
mergedChannelWriter,
rowCount,
channelsDataPerTable,
-1);
}
return new SerializationResult(
channelsMetadataList,
columnEpStatsMapCombined,
rowCount,
chunkEstimatedUncompressedSize,
mergedChunkData,
chunkMinMaxInsertTimeInMs);
}

private SerializationResult serializeFromJavaObjects(
List<ChannelData<ParquetChunkData>> channelsDataPerTable, String filePath)
throws IOException {
Expand All @@ -167,21 +72,19 @@ private SerializationResult serializeFromJavaObjects(
channelsMetadataList.add(channelMetadata);

logger.logDebug(
"Parquet Flusher: Start building channel={}, rowCount={}, bufferSize={} in blob={},"
+ " enableParquetMemoryOptimization={}",
"Parquet Flusher: Start building channel={}, rowCount={}, bufferSize={} in blob={}",
data.getChannelContext().getFullyQualifiedName(),
data.getRowCount(),
data.getBufferSize(),
filePath,
enableParquetInternalBuffering);
filePath);

if (rows == null) {
columnEpStatsMapCombined = data.getColumnEps();
rows = new ArrayList<>();
firstChannelFullyQualifiedTableName = data.getChannelContext().getFullyQualifiedTableName();
chunkMinMaxInsertTimeInMs = data.getMinMaxInsertTimeInMs();
} else {
// This method assumes that channelsDataPerTable is grouped by table. We double check
// This method assumes that channelsDataPerTable is grouped by table. We double-check
// here and throw an error if the assumption is violated
if (!data.getChannelContext()
.getFullyQualifiedTableName()
Expand All @@ -202,13 +105,11 @@ private SerializationResult serializeFromJavaObjects(
chunkEstimatedUncompressedSize += data.getBufferSize();

logger.logDebug(
"Parquet Flusher: Finish building channel={}, rowCount={}, bufferSize={} in blob={},"
+ " enableParquetMemoryOptimization={}",
"Parquet Flusher: Finish building channel={}, rowCount={}, bufferSize={} in blob={}",
data.getChannelContext().getFullyQualifiedName(),
data.getRowCount(),
data.getBufferSize(),
filePath,
enableParquetInternalBuffering);
filePath);
}

Map<String, String> metadata = channelsDataPerTable.get(0).getVectors().metadata;
Expand All @@ -227,8 +128,7 @@ private SerializationResult serializeFromJavaObjects(
rows.forEach(parquetWriter::writeRow);
parquetWriter.close();

this.verifyRowCounts(
"serializeFromJavaObjects", parquetWriter, rowCount, channelsDataPerTable, rows.size());
this.verifyRowCounts(parquetWriter, rowCount, channelsDataPerTable, rows.size());

return new SerializationResult(
channelsMetadataList,
Expand All @@ -243,15 +143,13 @@ private SerializationResult serializeFromJavaObjects(
* Validates that rows count in metadata matches the row count in Parquet footer and the row count
* written by the parquet writer
*
* @param serializationType Serialization type, used for logging purposes only
* @param writer Parquet writer writing the data
* @param channelsDataPerTable Channel data
* @param totalMetadataRowCount Row count calculated during metadata collection
* @param javaSerializationTotalRowCount Total row count when java object serialization is used.
* Used only for logging purposes if there is a mismatch.
*/
private void verifyRowCounts(
String serializationType,
BdecParquetWriter writer,
long totalMetadataRowCount,
List<ChannelData<ParquetChunkData>> channelsDataPerTable,
Expand Down Expand Up @@ -285,7 +183,7 @@ private void verifyRowCounts(
throw new SFException(
ErrorCode.INTERNAL_ERROR,
String.format(
"[%s]The number of rows in Parquet does not match the number of rows in metadata. "
"The number of rows in Parquet does not match the number of rows in metadata. "
+ "parquetTotalRowsInFooter=%d "
+ "totalMetadataRowCount=%d "
+ "parquetTotalRowsWritten=%d "
Expand All @@ -294,7 +192,6 @@ private void verifyRowCounts(
+ "channelsCountInMetadata=%d "
+ "countOfSerializedJavaObjects=%d "
+ "channelNames=%s",
serializationType,
parquetTotalRowsInFooter,
totalMetadataRowCount,
parquetTotalRowsWritten,
Expand Down
Loading

0 comments on commit 3cf3eb5

Please sign in to comment.