Skip to content

Commit

Permalink
SNOW-1659373 remove serializeFromParquetBuffers
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-gdoci committed Sep 13, 2024
1 parent d69adfd commit 9833aad
Show file tree
Hide file tree
Showing 6 changed files with 9 additions and 152 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
/** Channel's buffer relevant parameters that are set at the owning client level. */
public class ClientBufferParameters {

private boolean enableParquetInternalBuffering;

private long maxChunkSizeInBytes;

private long maxAllowedRowSizeInBytes;
Expand All @@ -23,18 +21,14 @@ public class ClientBufferParameters {
/**
* Private constructor used for test methods
*
* @param enableParquetInternalBuffering flag whether buffering in internal Parquet buffers is
* enabled
* @param maxChunkSizeInBytes maximum chunk size in bytes
* @param maxAllowedRowSizeInBytes maximum row size in bytes
*/
private ClientBufferParameters(
boolean enableParquetInternalBuffering,
long maxChunkSizeInBytes,
long maxAllowedRowSizeInBytes,
Constants.BdecParquetCompression bdecParquetCompression,
boolean enableNewJsonParsingLogic) {
this.enableParquetInternalBuffering = enableParquetInternalBuffering;
this.maxChunkSizeInBytes = maxChunkSizeInBytes;
this.maxAllowedRowSizeInBytes = maxAllowedRowSizeInBytes;
this.bdecParquetCompression = bdecParquetCompression;
Expand All @@ -43,10 +37,6 @@ private ClientBufferParameters(

/** @param clientInternal reference to the client object where the relevant parameters are set */
public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInternal) {
this.enableParquetInternalBuffering =
clientInternal != null
? clientInternal.getParameterProvider().getEnableParquetInternalBuffering()
: ParameterProvider.ENABLE_PARQUET_INTERNAL_BUFFERING_DEFAULT;
this.maxChunkSizeInBytes =
clientInternal != null
? clientInternal.getParameterProvider().getMaxChunkSizeInBytes()
Expand All @@ -67,30 +57,22 @@ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInter
}

/**
* @param enableParquetInternalBuffering flag whether buffering in internal Parquet buffers is
* enabled
* @param maxChunkSizeInBytes maximum chunk size in bytes
* @param maxAllowedRowSizeInBytes maximum row size in bytes
* @return ClientBufferParameters object
*/
public static ClientBufferParameters test_createClientBufferParameters(
boolean enableParquetInternalBuffering,
long maxChunkSizeInBytes,
long maxAllowedRowSizeInBytes,
Constants.BdecParquetCompression bdecParquetCompression,
boolean enableNewJsonParsingLogic) {
return new ClientBufferParameters(
enableParquetInternalBuffering,
maxChunkSizeInBytes,
maxAllowedRowSizeInBytes,
bdecParquetCompression,
enableNewJsonParsingLogic);
}

public boolean getEnableParquetInternalBuffering() {
return enableParquetInternalBuffering;
}

public long getMaxChunkSizeInBytes() {
return maxChunkSizeInBytes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import net.snowflake.ingest.utils.Logging;
import net.snowflake.ingest.utils.Pair;
import net.snowflake.ingest.utils.SFException;
import org.apache.parquet.hadoop.BdecParquetReader;
import org.apache.parquet.hadoop.BdecParquetWriter;
import org.apache.parquet.schema.MessageType;

Expand All @@ -26,121 +25,29 @@
public class ParquetFlusher implements Flusher<ParquetChunkData> {
private static final Logging logger = new Logging(ParquetFlusher.class);
private final MessageType schema;
private final boolean enableParquetInternalBuffering;
private final long maxChunkSizeInBytes;

private final Constants.BdecParquetCompression bdecParquetCompression;

/**
* Construct parquet flusher from its schema and set flag that indicates whether Parquet memory
* optimization is enabled, i.e. rows will be buffered in internal Parquet buffer.
* Construct parquet flusher from its schema.
*/
public ParquetFlusher(
MessageType schema,
boolean enableParquetInternalBuffering,
long maxChunkSizeInBytes,
Constants.BdecParquetCompression bdecParquetCompression) {
this.schema = schema;
this.enableParquetInternalBuffering = enableParquetInternalBuffering;
this.maxChunkSizeInBytes = maxChunkSizeInBytes;
this.bdecParquetCompression = bdecParquetCompression;
}

@Override
public SerializationResult serialize(
List<ChannelData<ParquetChunkData>> channelsDataPerTable, String filePath)
List<ChannelData<ParquetChunkData>> channelsDataPerTable, String filePath)
throws IOException {
if (enableParquetInternalBuffering) {
return serializeFromParquetWriteBuffers(channelsDataPerTable, filePath);
}
return serializeFromJavaObjects(channelsDataPerTable, filePath);
}

private SerializationResult serializeFromParquetWriteBuffers(
List<ChannelData<ParquetChunkData>> channelsDataPerTable, String filePath)
throws IOException {
List<ChannelMetadata> channelsMetadataList = new ArrayList<>();
long rowCount = 0L;
float chunkEstimatedUncompressedSize = 0f;
String firstChannelFullyQualifiedTableName = null;
Map<String, RowBufferStats> columnEpStatsMapCombined = null;
BdecParquetWriter mergedChannelWriter = null;
ByteArrayOutputStream mergedChunkData = new ByteArrayOutputStream();
Pair<Long, Long> chunkMinMaxInsertTimeInMs = null;

for (ChannelData<ParquetChunkData> data : channelsDataPerTable) {
// Create channel metadata
ChannelMetadata channelMetadata =
ChannelMetadata.builder()
.setOwningChannelFromContext(data.getChannelContext())
.setRowSequencer(data.getRowSequencer())
.setOffsetToken(data.getEndOffsetToken())
.setStartOffsetToken(data.getStartOffsetToken())
.build();
// Add channel metadata to the metadata list
channelsMetadataList.add(channelMetadata);

logger.logDebug(
"Parquet Flusher: Start building channel={}, rowCount={}, bufferSize={} in blob={}",
data.getChannelContext().getFullyQualifiedName(),
data.getRowCount(),
data.getBufferSize(),
filePath);

if (mergedChannelWriter == null) {
columnEpStatsMapCombined = data.getColumnEps();
mergedChannelWriter = data.getVectors().parquetWriter;
mergedChunkData = data.getVectors().output;
firstChannelFullyQualifiedTableName = data.getChannelContext().getFullyQualifiedTableName();
chunkMinMaxInsertTimeInMs = data.getMinMaxInsertTimeInMs();
} else {
// This method assumes that channelsDataPerTable is grouped by table. We double check
// here and throw an error if the assumption is violated
if (!data.getChannelContext()
.getFullyQualifiedTableName()
.equals(firstChannelFullyQualifiedTableName)) {
throw new SFException(ErrorCode.INVALID_DATA_IN_CHUNK);
}

columnEpStatsMapCombined =
ChannelData.getCombinedColumnStatsMap(columnEpStatsMapCombined, data.getColumnEps());
data.getVectors().parquetWriter.close();
BdecParquetReader.readFileIntoWriter(
data.getVectors().output.toByteArray(), mergedChannelWriter);
chunkMinMaxInsertTimeInMs =
ChannelData.getCombinedMinMaxInsertTimeInMs(
chunkMinMaxInsertTimeInMs, data.getMinMaxInsertTimeInMs());
}

rowCount += data.getRowCount();
chunkEstimatedUncompressedSize += data.getBufferSize();

logger.logDebug(
"Parquet Flusher: Finish building channel={}, rowCount={}, bufferSize={} in blob={}",
data.getChannelContext().getFullyQualifiedName(),
data.getRowCount(),
data.getBufferSize(),
filePath);
}

if (mergedChannelWriter != null) {
mergedChannelWriter.close();
this.verifyRowCounts(
"serializeFromParquetWriteBuffers",
mergedChannelWriter,
rowCount,
channelsDataPerTable,
-1);
}
return new SerializationResult(
channelsMetadataList,
columnEpStatsMapCombined,
rowCount,
chunkEstimatedUncompressedSize,
mergedChunkData,
chunkMinMaxInsertTimeInMs);
}

private SerializationResult serializeFromJavaObjects(
List<ChannelData<ParquetChunkData>> channelsDataPerTable, String filePath)
throws IOException {
Expand All @@ -167,21 +74,19 @@ private SerializationResult serializeFromJavaObjects(
channelsMetadataList.add(channelMetadata);

logger.logDebug(
"Parquet Flusher: Start building channel={}, rowCount={}, bufferSize={} in blob={},"
+ " enableParquetMemoryOptimization={}",
"Parquet Flusher: Start building channel={}, rowCount={}, bufferSize={} in blob={}",
data.getChannelContext().getFullyQualifiedName(),
data.getRowCount(),
data.getBufferSize(),
filePath,
enableParquetInternalBuffering);
filePath);

if (rows == null) {
columnEpStatsMapCombined = data.getColumnEps();
rows = new ArrayList<>();
firstChannelFullyQualifiedTableName = data.getChannelContext().getFullyQualifiedTableName();
chunkMinMaxInsertTimeInMs = data.getMinMaxInsertTimeInMs();
} else {
// This method assumes that channelsDataPerTable is grouped by table. We double check
// This method assumes that channelsDataPerTable is grouped by table. We double-check
// here and throw an error if the assumption is violated
if (!data.getChannelContext()
.getFullyQualifiedTableName()
Expand All @@ -202,13 +107,11 @@ private SerializationResult serializeFromJavaObjects(
chunkEstimatedUncompressedSize += data.getBufferSize();

logger.logDebug(
"Parquet Flusher: Finish building channel={}, rowCount={}, bufferSize={} in blob={},"
+ " enableParquetMemoryOptimization={}",
"Parquet Flusher: Finish building channel={}, rowCount={}, bufferSize={} in blob={}",
data.getChannelContext().getFullyQualifiedName(),
data.getRowCount(),
data.getBufferSize(),
filePath,
enableParquetInternalBuffering);
filePath);
}

Map<String, String> metadata = channelsDataPerTable.get(0).getVectors().metadata;
Expand All @@ -227,8 +130,7 @@ private SerializationResult serializeFromJavaObjects(
rows.forEach(parquetWriter::writeRow);
parquetWriter.close();

this.verifyRowCounts(
"serializeFromJavaObjects", parquetWriter, rowCount, channelsDataPerTable, rows.size());
this.verifyRowCounts(parquetWriter, rowCount, channelsDataPerTable, rows.size());

return new SerializationResult(
channelsMetadataList,
Expand All @@ -243,15 +145,13 @@ private SerializationResult serializeFromJavaObjects(
* Validates that rows count in metadata matches the row count in Parquet footer and the row count
* written by the parquet writer
*
* @param serializationType Serialization type, used for logging purposes only
* @param writer Parquet writer writing the data
* @param channelsDataPerTable Channel data
* @param totalMetadataRowCount Row count calculated during metadata collection
* @param javaSerializationTotalRowCount Total row count when java object serialization is used.
* Used only for logging purposes if there is a mismatch.
*/
private void verifyRowCounts(
String serializationType,
BdecParquetWriter writer,
long totalMetadataRowCount,
List<ChannelData<ParquetChunkData>> channelsDataPerTable,
Expand Down Expand Up @@ -285,7 +185,7 @@ private void verifyRowCounts(
throw new SFException(
ErrorCode.INTERNAL_ERROR,
String.format(
"[%s]The number of rows in Parquet does not match the number of rows in metadata. "
"The number of rows in Parquet does not match the number of rows in metadata. "
+ "parquetTotalRowsInFooter=%d "
+ "totalMetadataRowCount=%d "
+ "parquetTotalRowsWritten=%d "
Expand All @@ -294,7 +194,6 @@ private void verifyRowCounts(
+ "channelsCountInMetadata=%d "
+ "countOfSerializedJavaObjects=%d "
+ "channelNames=%s",
serializationType,
parquetTotalRowsInFooter,
totalMetadataRowCount,
parquetTotalRowsWritten,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,6 @@ void closeInternal() {
public Flusher<ParquetChunkData> createFlusher() {
return new ParquetFlusher(
schema,
clientBufferParameters.getEnableParquetInternalBuffering(),
clientBufferParameters.getMaxChunkSizeInBytes(),
clientBufferParameters.getBdecParquetCompression());
}
Expand Down
21 changes: 0 additions & 21 deletions src/main/java/net/snowflake/ingest/utils/ParameterProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ public class ParameterProvider {
public static final String BLOB_UPLOAD_MAX_RETRY_COUNT =
"BLOB_UPLOAD_MAX_RETRY_COUNT".toLowerCase();
public static final String MAX_MEMORY_LIMIT_IN_BYTES = "MAX_MEMORY_LIMIT_IN_BYTES".toLowerCase();
public static final String ENABLE_PARQUET_INTERNAL_BUFFERING =
"ENABLE_PARQUET_INTERNAL_BUFFERING".toLowerCase();
// This should not be needed once we have the ability to track size at table/chunk level
public static final String MAX_CHANNEL_SIZE_IN_BYTES = "MAX_CHANNEL_SIZE_IN_BYTES".toLowerCase();
public static final String MAX_CHUNK_SIZE_IN_BYTES = "MAX_CHUNK_SIZE_IN_BYTES".toLowerCase();
Expand Down Expand Up @@ -79,10 +77,6 @@ public class ParameterProvider {
/* Iceberg mode parameters: When streaming to Iceberg mode, different default parameters are required because it generates Parquet files instead of BDEC files. */
public static final int MAX_CHUNKS_IN_BLOB_ICEBERG_MODE_DEFAULT = 1;

/* Parameter that enables using internal Parquet buffers for buffering of rows before serializing.
It reduces memory consumption compared to using Java Objects for buffering.*/
public static final boolean ENABLE_PARQUET_INTERNAL_BUFFERING_DEFAULT = false;

public static final boolean ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT = true;

/** Map of parameter name to parameter value. This will be set by client/configure API Call. */
Expand Down Expand Up @@ -212,13 +206,6 @@ private void setParameterMap(
props,
false);

this.checkAndUpdate(
ENABLE_PARQUET_INTERNAL_BUFFERING,
ENABLE_PARQUET_INTERNAL_BUFFERING_DEFAULT,
parameterOverrides,
props,
false);

this.checkAndUpdate(
MAX_CHANNEL_SIZE_IN_BYTES,
MAX_CHANNEL_SIZE_IN_BYTES_DEFAULT,
Expand Down Expand Up @@ -440,14 +427,6 @@ public long getMaxMemoryLimitInBytes() {
return (val instanceof String) ? Long.parseLong(val.toString()) : (long) val;
}

/** @return Return whether memory optimization for Parquet is enabled. */
public boolean getEnableParquetInternalBuffering() {
Object val =
this.parameterMap.getOrDefault(
ENABLE_PARQUET_INTERNAL_BUFFERING, ENABLE_PARQUET_INTERNAL_BUFFERING_DEFAULT);
return (val instanceof String) ? Boolean.parseBoolean(val.toString()) : (boolean) val;
}

/** @return The max channel size in bytes */
public long getMaxChannelSizeInBytes() {
Object val =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ private List<ChannelData<ParquetChunkData>> createChannelDataPerTable(
Mockito.doReturn(
new ParquetFlusher(
schema,
enableParquetInternalBuffering,
100L,
Constants.BdecParquetCompression.GZIP))
.when(channelData)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ private AbstractRowBuffer<?> createTestBuffer(OpenChannelRequest.OnErrorOption o
rs -> {},
initialState,
ClientBufferParameters.test_createClientBufferParameters(
enableParquetMemoryOptimization,
MAX_CHUNK_SIZE_IN_BYTES_DEFAULT,
MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT,
Constants.BdecParquetCompression.GZIP,
Expand Down

0 comments on commit 9833aad

Please sign in to comment.