From 9833aad2f8acd7a1c49449524fa87cb57c9d3730 Mon Sep 17 00:00:00 2001 From: Gloria Doci Date: Fri, 13 Sep 2024 11:14:23 +0200 Subject: [PATCH] SNOW-1659373 remove serializeFromParquetBuffers --- .../internal/ClientBufferParameters.java | 18 --- .../streaming/internal/ParquetFlusher.java | 119 ++---------------- .../streaming/internal/ParquetRowBuffer.java | 1 - .../ingest/utils/ParameterProvider.java | 21 ---- .../streaming/internal/BlobBuilderTest.java | 1 - .../streaming/internal/RowBufferTest.java | 1 - 6 files changed, 9 insertions(+), 152 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java b/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java index ac05c814e..c73337d1e 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java @@ -10,8 +10,6 @@ /** Channel's buffer relevant parameters that are set at the owning client level. */ public class ClientBufferParameters { - private boolean enableParquetInternalBuffering; - private long maxChunkSizeInBytes; private long maxAllowedRowSizeInBytes; @@ -23,18 +21,14 @@ public class ClientBufferParameters { /** * Private constructor used for test methods * - * @param enableParquetInternalBuffering flag whether buffering in internal Parquet buffers is - * enabled * @param maxChunkSizeInBytes maximum chunk size in bytes * @param maxAllowedRowSizeInBytes maximum row size in bytes */ private ClientBufferParameters( - boolean enableParquetInternalBuffering, long maxChunkSizeInBytes, long maxAllowedRowSizeInBytes, Constants.BdecParquetCompression bdecParquetCompression, boolean enableNewJsonParsingLogic) { - this.enableParquetInternalBuffering = enableParquetInternalBuffering; this.maxChunkSizeInBytes = maxChunkSizeInBytes; this.maxAllowedRowSizeInBytes = maxAllowedRowSizeInBytes; this.bdecParquetCompression = bdecParquetCompression; @@ -43,10 +37,6 @@ private ClientBufferParameters( /** @param clientInternal reference to the client object where the relevant parameters are set */ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInternal) { - this.enableParquetInternalBuffering = - clientInternal != null - ? clientInternal.getParameterProvider().getEnableParquetInternalBuffering() - : ParameterProvider.ENABLE_PARQUET_INTERNAL_BUFFERING_DEFAULT; this.maxChunkSizeInBytes = clientInternal != null ? clientInternal.getParameterProvider().getMaxChunkSizeInBytes() @@ -67,30 +57,22 @@ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInter } /** - * @param enableParquetInternalBuffering flag whether buffering in internal Parquet buffers is - * enabled * @param maxChunkSizeInBytes maximum chunk size in bytes * @param maxAllowedRowSizeInBytes maximum row size in bytes * @return ClientBufferParameters object */ public static ClientBufferParameters test_createClientBufferParameters( - boolean enableParquetInternalBuffering, long maxChunkSizeInBytes, long maxAllowedRowSizeInBytes, Constants.BdecParquetCompression bdecParquetCompression, boolean enableNewJsonParsingLogic) { return new ClientBufferParameters( - enableParquetInternalBuffering, maxChunkSizeInBytes, maxAllowedRowSizeInBytes, bdecParquetCompression, enableNewJsonParsingLogic); } - public boolean getEnableParquetInternalBuffering() { - return enableParquetInternalBuffering; - } - public long getMaxChunkSizeInBytes() { return maxChunkSizeInBytes; } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java index d338a6a7b..f3f2ad054 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java @@ -15,7 +15,6 @@ import net.snowflake.ingest.utils.Logging; import net.snowflake.ingest.utils.Pair; import net.snowflake.ingest.utils.SFException; -import org.apache.parquet.hadoop.BdecParquetReader; import org.apache.parquet.hadoop.BdecParquetWriter; import org.apache.parquet.schema.MessageType; @@ -26,121 +25,29 @@ public class ParquetFlusher implements Flusher { private static final Logging logger = new Logging(ParquetFlusher.class); private final MessageType schema; - private final boolean enableParquetInternalBuffering; private final long maxChunkSizeInBytes; private final Constants.BdecParquetCompression bdecParquetCompression; /** - * Construct parquet flusher from its schema and set flag that indicates whether Parquet memory - * optimization is enabled, i.e. rows will be buffered in internal Parquet buffer. + * Construct parquet flusher from its schema. */ public ParquetFlusher( MessageType schema, - boolean enableParquetInternalBuffering, long maxChunkSizeInBytes, Constants.BdecParquetCompression bdecParquetCompression) { this.schema = schema; - this.enableParquetInternalBuffering = enableParquetInternalBuffering; this.maxChunkSizeInBytes = maxChunkSizeInBytes; this.bdecParquetCompression = bdecParquetCompression; } @Override public SerializationResult serialize( - List> channelsDataPerTable, String filePath) + List> channelsDataPerTable, String filePath) throws IOException { - if (enableParquetInternalBuffering) { - return serializeFromParquetWriteBuffers(channelsDataPerTable, filePath); - } return serializeFromJavaObjects(channelsDataPerTable, filePath); } - private SerializationResult serializeFromParquetWriteBuffers( - List> channelsDataPerTable, String filePath) - throws IOException { - List channelsMetadataList = new ArrayList<>(); - long rowCount = 0L; - float chunkEstimatedUncompressedSize = 0f; - String firstChannelFullyQualifiedTableName = null; - Map columnEpStatsMapCombined = null; - BdecParquetWriter mergedChannelWriter = null; - ByteArrayOutputStream mergedChunkData = new ByteArrayOutputStream(); - Pair chunkMinMaxInsertTimeInMs = null; - - for (ChannelData data : channelsDataPerTable) { - // Create channel metadata - ChannelMetadata channelMetadata = - ChannelMetadata.builder() - .setOwningChannelFromContext(data.getChannelContext()) - .setRowSequencer(data.getRowSequencer()) - .setOffsetToken(data.getEndOffsetToken()) - .setStartOffsetToken(data.getStartOffsetToken()) - .build(); - // Add channel metadata to the metadata list - channelsMetadataList.add(channelMetadata); - - logger.logDebug( - "Parquet Flusher: Start building channel={}, rowCount={}, bufferSize={} in blob={}", - data.getChannelContext().getFullyQualifiedName(), - data.getRowCount(), - data.getBufferSize(), - filePath); - - if (mergedChannelWriter == null) { - columnEpStatsMapCombined = data.getColumnEps(); - mergedChannelWriter = data.getVectors().parquetWriter; - mergedChunkData = data.getVectors().output; - firstChannelFullyQualifiedTableName = data.getChannelContext().getFullyQualifiedTableName(); - chunkMinMaxInsertTimeInMs = data.getMinMaxInsertTimeInMs(); - } else { - // This method assumes that channelsDataPerTable is grouped by table. We double check - // here and throw an error if the assumption is violated - if (!data.getChannelContext() - .getFullyQualifiedTableName() - .equals(firstChannelFullyQualifiedTableName)) { - throw new SFException(ErrorCode.INVALID_DATA_IN_CHUNK); - } - - columnEpStatsMapCombined = - ChannelData.getCombinedColumnStatsMap(columnEpStatsMapCombined, data.getColumnEps()); - data.getVectors().parquetWriter.close(); - BdecParquetReader.readFileIntoWriter( - data.getVectors().output.toByteArray(), mergedChannelWriter); - chunkMinMaxInsertTimeInMs = - ChannelData.getCombinedMinMaxInsertTimeInMs( - chunkMinMaxInsertTimeInMs, data.getMinMaxInsertTimeInMs()); - } - - rowCount += data.getRowCount(); - chunkEstimatedUncompressedSize += data.getBufferSize(); - - logger.logDebug( - "Parquet Flusher: Finish building channel={}, rowCount={}, bufferSize={} in blob={}", - data.getChannelContext().getFullyQualifiedName(), - data.getRowCount(), - data.getBufferSize(), - filePath); - } - - if (mergedChannelWriter != null) { - mergedChannelWriter.close(); - this.verifyRowCounts( - "serializeFromParquetWriteBuffers", - mergedChannelWriter, - rowCount, - channelsDataPerTable, - -1); - } - return new SerializationResult( - channelsMetadataList, - columnEpStatsMapCombined, - rowCount, - chunkEstimatedUncompressedSize, - mergedChunkData, - chunkMinMaxInsertTimeInMs); - } - private SerializationResult serializeFromJavaObjects( List> channelsDataPerTable, String filePath) throws IOException { @@ -167,13 +74,11 @@ private SerializationResult serializeFromJavaObjects( channelsMetadataList.add(channelMetadata); logger.logDebug( - "Parquet Flusher: Start building channel={}, rowCount={}, bufferSize={} in blob={}," - + " enableParquetMemoryOptimization={}", + "Parquet Flusher: Start building channel={}, rowCount={}, bufferSize={} in blob={}", data.getChannelContext().getFullyQualifiedName(), data.getRowCount(), data.getBufferSize(), - filePath, - enableParquetInternalBuffering); + filePath); if (rows == null) { columnEpStatsMapCombined = data.getColumnEps(); @@ -181,7 +86,7 @@ private SerializationResult serializeFromJavaObjects( firstChannelFullyQualifiedTableName = data.getChannelContext().getFullyQualifiedTableName(); chunkMinMaxInsertTimeInMs = data.getMinMaxInsertTimeInMs(); } else { - // This method assumes that channelsDataPerTable is grouped by table. We double check + // This method assumes that channelsDataPerTable is grouped by table. We double-check // here and throw an error if the assumption is violated if (!data.getChannelContext() .getFullyQualifiedTableName() @@ -202,13 +107,11 @@ private SerializationResult serializeFromJavaObjects( chunkEstimatedUncompressedSize += data.getBufferSize(); logger.logDebug( - "Parquet Flusher: Finish building channel={}, rowCount={}, bufferSize={} in blob={}," - + " enableParquetMemoryOptimization={}", + "Parquet Flusher: Finish building channel={}, rowCount={}, bufferSize={} in blob={}", data.getChannelContext().getFullyQualifiedName(), data.getRowCount(), data.getBufferSize(), - filePath, - enableParquetInternalBuffering); + filePath); } Map metadata = channelsDataPerTable.get(0).getVectors().metadata; @@ -227,8 +130,7 @@ private SerializationResult serializeFromJavaObjects( rows.forEach(parquetWriter::writeRow); parquetWriter.close(); - this.verifyRowCounts( - "serializeFromJavaObjects", parquetWriter, rowCount, channelsDataPerTable, rows.size()); + this.verifyRowCounts(parquetWriter, rowCount, channelsDataPerTable, rows.size()); return new SerializationResult( channelsMetadataList, @@ -243,7 +145,6 @@ private SerializationResult serializeFromJavaObjects( * Validates that rows count in metadata matches the row count in Parquet footer and the row count * written by the parquet writer * - * @param serializationType Serialization type, used for logging purposes only * @param writer Parquet writer writing the data * @param channelsDataPerTable Channel data * @param totalMetadataRowCount Row count calculated during metadata collection @@ -251,7 +152,6 @@ private SerializationResult serializeFromJavaObjects( * Used only for logging purposes if there is a mismatch. */ private void verifyRowCounts( - String serializationType, BdecParquetWriter writer, long totalMetadataRowCount, List> channelsDataPerTable, @@ -285,7 +185,7 @@ private void verifyRowCounts( throw new SFException( ErrorCode.INTERNAL_ERROR, String.format( - "[%s]The number of rows in Parquet does not match the number of rows in metadata. " + "The number of rows in Parquet does not match the number of rows in metadata. " + "parquetTotalRowsInFooter=%d " + "totalMetadataRowCount=%d " + "parquetTotalRowsWritten=%d " @@ -294,7 +194,6 @@ private void verifyRowCounts( + "channelsCountInMetadata=%d " + "countOfSerializedJavaObjects=%d " + "channelNames=%s", - serializationType, parquetTotalRowsInFooter, totalMetadataRowCount, parquetTotalRowsWritten, diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java index 30851c274..680578eb8 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java @@ -329,7 +329,6 @@ void closeInternal() { public Flusher createFlusher() { return new ParquetFlusher( schema, - clientBufferParameters.getEnableParquetInternalBuffering(), clientBufferParameters.getMaxChunkSizeInBytes(), clientBufferParameters.getBdecParquetCompression()); } diff --git a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java index 0525737a3..885314b01 100644 --- a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java +++ b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java @@ -30,8 +30,6 @@ public class ParameterProvider { public static final String BLOB_UPLOAD_MAX_RETRY_COUNT = "BLOB_UPLOAD_MAX_RETRY_COUNT".toLowerCase(); public static final String MAX_MEMORY_LIMIT_IN_BYTES = "MAX_MEMORY_LIMIT_IN_BYTES".toLowerCase(); - public static final String ENABLE_PARQUET_INTERNAL_BUFFERING = - "ENABLE_PARQUET_INTERNAL_BUFFERING".toLowerCase(); // This should not be needed once we have the ability to track size at table/chunk level public static final String MAX_CHANNEL_SIZE_IN_BYTES = "MAX_CHANNEL_SIZE_IN_BYTES".toLowerCase(); public static final String MAX_CHUNK_SIZE_IN_BYTES = "MAX_CHUNK_SIZE_IN_BYTES".toLowerCase(); @@ -79,10 +77,6 @@ public class ParameterProvider { /* Iceberg mode parameters: When streaming to Iceberg mode, different default parameters are required because it generates Parquet files instead of BDEC files. */ public static final int MAX_CHUNKS_IN_BLOB_ICEBERG_MODE_DEFAULT = 1; - /* Parameter that enables using internal Parquet buffers for buffering of rows before serializing. - It reduces memory consumption compared to using Java Objects for buffering.*/ - public static final boolean ENABLE_PARQUET_INTERNAL_BUFFERING_DEFAULT = false; - public static final boolean ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT = true; /** Map of parameter name to parameter value. This will be set by client/configure API Call. */ @@ -212,13 +206,6 @@ private void setParameterMap( props, false); - this.checkAndUpdate( - ENABLE_PARQUET_INTERNAL_BUFFERING, - ENABLE_PARQUET_INTERNAL_BUFFERING_DEFAULT, - parameterOverrides, - props, - false); - this.checkAndUpdate( MAX_CHANNEL_SIZE_IN_BYTES, MAX_CHANNEL_SIZE_IN_BYTES_DEFAULT, @@ -440,14 +427,6 @@ public long getMaxMemoryLimitInBytes() { return (val instanceof String) ? Long.parseLong(val.toString()) : (long) val; } - /** @return Return whether memory optimization for Parquet is enabled. */ - public boolean getEnableParquetInternalBuffering() { - Object val = - this.parameterMap.getOrDefault( - ENABLE_PARQUET_INTERNAL_BUFFERING, ENABLE_PARQUET_INTERNAL_BUFFERING_DEFAULT); - return (val instanceof String) ? Boolean.parseBoolean(val.toString()) : (boolean) val; - } - /** @return The max channel size in bytes */ public long getMaxChannelSizeInBytes() { Object val = diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java index 17d90f458..9d8383c0f 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java @@ -96,7 +96,6 @@ private List> createChannelDataPerTable( Mockito.doReturn( new ParquetFlusher( schema, - enableParquetInternalBuffering, 100L, Constants.BdecParquetCompression.GZIP)) .when(channelData) diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java index 3f8e927a4..243b20c06 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java @@ -129,7 +129,6 @@ private AbstractRowBuffer createTestBuffer(OpenChannelRequest.OnErrorOption o rs -> {}, initialState, ClientBufferParameters.test_createClientBufferParameters( - enableParquetMemoryOptimization, MAX_CHUNK_SIZE_IN_BYTES_DEFAULT, MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT, Constants.BdecParquetCompression.GZIP,