From 3cf3eb50ab2352dcdedd9f2fe8b110544295b7e0 Mon Sep 17 00:00:00 2001 From: Gloria Doci Date: Mon, 16 Sep 2024 15:05:09 +0200 Subject: [PATCH] SNOW-1659373 cleanup serializeFromParquetBuffers (#829) SNOW-1659373 remove serializeFromParquetBuffers --- .../internal/ClientBufferParameters.java | 18 --- .../streaming/internal/ParquetChunkData.java | 15 +-- .../streaming/internal/ParquetFlusher.java | 119 ++---------------- .../streaming/internal/ParquetRowBuffer.java | 55 +------- .../ingest/utils/ParameterProvider.java | 21 ---- .../streaming/internal/BlobBuilderTest.java | 46 +------ .../streaming/internal/RowBufferTest.java | 7 -- 7 files changed, 19 insertions(+), 262 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java b/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java index ac05c814e..c73337d1e 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java @@ -10,8 +10,6 @@ /** Channel's buffer relevant parameters that are set at the owning client level. */ public class ClientBufferParameters { - private boolean enableParquetInternalBuffering; - private long maxChunkSizeInBytes; private long maxAllowedRowSizeInBytes; @@ -23,18 +21,14 @@ public class ClientBufferParameters { /** * Private constructor used for test methods * - * @param enableParquetInternalBuffering flag whether buffering in internal Parquet buffers is - * enabled * @param maxChunkSizeInBytes maximum chunk size in bytes * @param maxAllowedRowSizeInBytes maximum row size in bytes */ private ClientBufferParameters( - boolean enableParquetInternalBuffering, long maxChunkSizeInBytes, long maxAllowedRowSizeInBytes, Constants.BdecParquetCompression bdecParquetCompression, boolean enableNewJsonParsingLogic) { - this.enableParquetInternalBuffering = enableParquetInternalBuffering; this.maxChunkSizeInBytes = maxChunkSizeInBytes; this.maxAllowedRowSizeInBytes = maxAllowedRowSizeInBytes; this.bdecParquetCompression = bdecParquetCompression; @@ -43,10 +37,6 @@ private ClientBufferParameters( /** @param clientInternal reference to the client object where the relevant parameters are set */ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInternal) { - this.enableParquetInternalBuffering = - clientInternal != null - ? clientInternal.getParameterProvider().getEnableParquetInternalBuffering() - : ParameterProvider.ENABLE_PARQUET_INTERNAL_BUFFERING_DEFAULT; this.maxChunkSizeInBytes = clientInternal != null ? clientInternal.getParameterProvider().getMaxChunkSizeInBytes() @@ -67,30 +57,22 @@ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInter } /** - * @param enableParquetInternalBuffering flag whether buffering in internal Parquet buffers is - * enabled * @param maxChunkSizeInBytes maximum chunk size in bytes * @param maxAllowedRowSizeInBytes maximum row size in bytes * @return ClientBufferParameters object */ public static ClientBufferParameters test_createClientBufferParameters( - boolean enableParquetInternalBuffering, long maxChunkSizeInBytes, long maxAllowedRowSizeInBytes, Constants.BdecParquetCompression bdecParquetCompression, boolean enableNewJsonParsingLogic) { return new ClientBufferParameters( - enableParquetInternalBuffering, maxChunkSizeInBytes, maxAllowedRowSizeInBytes, bdecParquetCompression, enableNewJsonParsingLogic); } - public boolean getEnableParquetInternalBuffering() { - return enableParquetInternalBuffering; - } - public long getMaxChunkSizeInBytes() { return maxChunkSizeInBytes; } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetChunkData.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetChunkData.java index 9950c44aa..e3fe97dfb 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetChunkData.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetChunkData.java @@ -4,37 +4,24 @@ package net.snowflake.ingest.streaming.internal; -import java.io.ByteArrayOutputStream; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.parquet.hadoop.BdecParquetWriter; /** Parquet data holder to buffer rows. */ public class ParquetChunkData { // buffered rows serialized into Java objects. Needed for the Parquet w/o memory optimization. final List> rows; - - final BdecParquetWriter parquetWriter; - final ByteArrayOutputStream output; final Map metadata; /** * Construct parquet data chunk. * * @param rows buffered row data as a list - * @param parquetWriter buffered parquet row data - * @param output byte array file output * @param metadata chunk metadata */ - public ParquetChunkData( - List> rows, - BdecParquetWriter parquetWriter, - ByteArrayOutputStream output, - Map metadata) { + public ParquetChunkData(List> rows, Map metadata) { this.rows = rows; - this.parquetWriter = parquetWriter; - this.output = output; // create a defensive copy of the parameter map because the argument map passed here // may currently be shared across multiple threads. this.metadata = createDefensiveCopy(metadata); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java index d338a6a7b..ddfca4a42 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java @@ -15,7 +15,6 @@ import net.snowflake.ingest.utils.Logging; import net.snowflake.ingest.utils.Pair; import net.snowflake.ingest.utils.SFException; -import org.apache.parquet.hadoop.BdecParquetReader; import org.apache.parquet.hadoop.BdecParquetWriter; import org.apache.parquet.schema.MessageType; @@ -26,22 +25,16 @@ public class ParquetFlusher implements Flusher { private static final Logging logger = new Logging(ParquetFlusher.class); private final MessageType schema; - private final boolean enableParquetInternalBuffering; private final long maxChunkSizeInBytes; private final Constants.BdecParquetCompression bdecParquetCompression; - /** - * Construct parquet flusher from its schema and set flag that indicates whether Parquet memory - * optimization is enabled, i.e. rows will be buffered in internal Parquet buffer. - */ + /** Construct parquet flusher from its schema. */ public ParquetFlusher( MessageType schema, - boolean enableParquetInternalBuffering, long maxChunkSizeInBytes, Constants.BdecParquetCompression bdecParquetCompression) { this.schema = schema; - this.enableParquetInternalBuffering = enableParquetInternalBuffering; this.maxChunkSizeInBytes = maxChunkSizeInBytes; this.bdecParquetCompression = bdecParquetCompression; } @@ -50,97 +43,9 @@ public ParquetFlusher( public SerializationResult serialize( List> channelsDataPerTable, String filePath) throws IOException { - if (enableParquetInternalBuffering) { - return serializeFromParquetWriteBuffers(channelsDataPerTable, filePath); - } return serializeFromJavaObjects(channelsDataPerTable, filePath); } - private SerializationResult serializeFromParquetWriteBuffers( - List> channelsDataPerTable, String filePath) - throws IOException { - List channelsMetadataList = new ArrayList<>(); - long rowCount = 0L; - float chunkEstimatedUncompressedSize = 0f; - String firstChannelFullyQualifiedTableName = null; - Map columnEpStatsMapCombined = null; - BdecParquetWriter mergedChannelWriter = null; - ByteArrayOutputStream mergedChunkData = new ByteArrayOutputStream(); - Pair chunkMinMaxInsertTimeInMs = null; - - for (ChannelData data : channelsDataPerTable) { - // Create channel metadata - ChannelMetadata channelMetadata = - ChannelMetadata.builder() - .setOwningChannelFromContext(data.getChannelContext()) - .setRowSequencer(data.getRowSequencer()) - .setOffsetToken(data.getEndOffsetToken()) - .setStartOffsetToken(data.getStartOffsetToken()) - .build(); - // Add channel metadata to the metadata list - channelsMetadataList.add(channelMetadata); - - logger.logDebug( - "Parquet Flusher: Start building channel={}, rowCount={}, bufferSize={} in blob={}", - data.getChannelContext().getFullyQualifiedName(), - data.getRowCount(), - data.getBufferSize(), - filePath); - - if (mergedChannelWriter == null) { - columnEpStatsMapCombined = data.getColumnEps(); - mergedChannelWriter = data.getVectors().parquetWriter; - mergedChunkData = data.getVectors().output; - firstChannelFullyQualifiedTableName = data.getChannelContext().getFullyQualifiedTableName(); - chunkMinMaxInsertTimeInMs = data.getMinMaxInsertTimeInMs(); - } else { - // This method assumes that channelsDataPerTable is grouped by table. We double check - // here and throw an error if the assumption is violated - if (!data.getChannelContext() - .getFullyQualifiedTableName() - .equals(firstChannelFullyQualifiedTableName)) { - throw new SFException(ErrorCode.INVALID_DATA_IN_CHUNK); - } - - columnEpStatsMapCombined = - ChannelData.getCombinedColumnStatsMap(columnEpStatsMapCombined, data.getColumnEps()); - data.getVectors().parquetWriter.close(); - BdecParquetReader.readFileIntoWriter( - data.getVectors().output.toByteArray(), mergedChannelWriter); - chunkMinMaxInsertTimeInMs = - ChannelData.getCombinedMinMaxInsertTimeInMs( - chunkMinMaxInsertTimeInMs, data.getMinMaxInsertTimeInMs()); - } - - rowCount += data.getRowCount(); - chunkEstimatedUncompressedSize += data.getBufferSize(); - - logger.logDebug( - "Parquet Flusher: Finish building channel={}, rowCount={}, bufferSize={} in blob={}", - data.getChannelContext().getFullyQualifiedName(), - data.getRowCount(), - data.getBufferSize(), - filePath); - } - - if (mergedChannelWriter != null) { - mergedChannelWriter.close(); - this.verifyRowCounts( - "serializeFromParquetWriteBuffers", - mergedChannelWriter, - rowCount, - channelsDataPerTable, - -1); - } - return new SerializationResult( - channelsMetadataList, - columnEpStatsMapCombined, - rowCount, - chunkEstimatedUncompressedSize, - mergedChunkData, - chunkMinMaxInsertTimeInMs); - } - private SerializationResult serializeFromJavaObjects( List> channelsDataPerTable, String filePath) throws IOException { @@ -167,13 +72,11 @@ private SerializationResult serializeFromJavaObjects( channelsMetadataList.add(channelMetadata); logger.logDebug( - "Parquet Flusher: Start building channel={}, rowCount={}, bufferSize={} in blob={}," - + " enableParquetMemoryOptimization={}", + "Parquet Flusher: Start building channel={}, rowCount={}, bufferSize={} in blob={}", data.getChannelContext().getFullyQualifiedName(), data.getRowCount(), data.getBufferSize(), - filePath, - enableParquetInternalBuffering); + filePath); if (rows == null) { columnEpStatsMapCombined = data.getColumnEps(); @@ -181,7 +84,7 @@ private SerializationResult serializeFromJavaObjects( firstChannelFullyQualifiedTableName = data.getChannelContext().getFullyQualifiedTableName(); chunkMinMaxInsertTimeInMs = data.getMinMaxInsertTimeInMs(); } else { - // This method assumes that channelsDataPerTable is grouped by table. We double check + // This method assumes that channelsDataPerTable is grouped by table. We double-check // here and throw an error if the assumption is violated if (!data.getChannelContext() .getFullyQualifiedTableName() @@ -202,13 +105,11 @@ private SerializationResult serializeFromJavaObjects( chunkEstimatedUncompressedSize += data.getBufferSize(); logger.logDebug( - "Parquet Flusher: Finish building channel={}, rowCount={}, bufferSize={} in blob={}," - + " enableParquetMemoryOptimization={}", + "Parquet Flusher: Finish building channel={}, rowCount={}, bufferSize={} in blob={}", data.getChannelContext().getFullyQualifiedName(), data.getRowCount(), data.getBufferSize(), - filePath, - enableParquetInternalBuffering); + filePath); } Map metadata = channelsDataPerTable.get(0).getVectors().metadata; @@ -227,8 +128,7 @@ private SerializationResult serializeFromJavaObjects( rows.forEach(parquetWriter::writeRow); parquetWriter.close(); - this.verifyRowCounts( - "serializeFromJavaObjects", parquetWriter, rowCount, channelsDataPerTable, rows.size()); + this.verifyRowCounts(parquetWriter, rowCount, channelsDataPerTable, rows.size()); return new SerializationResult( channelsMetadataList, @@ -243,7 +143,6 @@ private SerializationResult serializeFromJavaObjects( * Validates that rows count in metadata matches the row count in Parquet footer and the row count * written by the parquet writer * - * @param serializationType Serialization type, used for logging purposes only * @param writer Parquet writer writing the data * @param channelsDataPerTable Channel data * @param totalMetadataRowCount Row count calculated during metadata collection @@ -251,7 +150,6 @@ private SerializationResult serializeFromJavaObjects( * Used only for logging purposes if there is a mismatch. */ private void verifyRowCounts( - String serializationType, BdecParquetWriter writer, long totalMetadataRowCount, List> channelsDataPerTable, @@ -285,7 +183,7 @@ private void verifyRowCounts( throw new SFException( ErrorCode.INTERNAL_ERROR, String.format( - "[%s]The number of rows in Parquet does not match the number of rows in metadata. " + "The number of rows in Parquet does not match the number of rows in metadata. " + "parquetTotalRowsInFooter=%d " + "totalMetadataRowCount=%d " + "parquetTotalRowsWritten=%d " @@ -294,7 +192,6 @@ private void verifyRowCounts( + "channelsCountInMetadata=%d " + "countOfSerializedJavaObjects=%d " + "channelNames=%s", - serializationType, parquetTotalRowsInFooter, totalMetadataRowCount, parquetTotalRowsWritten, diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java index 30851c274..5ada286e5 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java @@ -4,8 +4,6 @@ package net.snowflake.ingest.streaming.internal; -import java.io.ByteArrayOutputStream; -import java.io.IOException; import java.math.BigDecimal; import java.math.BigInteger; import java.nio.charset.StandardCharsets; @@ -24,7 +22,6 @@ import net.snowflake.ingest.streaming.OpenChannelRequest; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.SFException; -import org.apache.parquet.hadoop.BdecParquetWriter; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; @@ -43,11 +40,6 @@ public class ParquetRowBuffer extends AbstractRowBuffer { /* Unflushed rows as Java objects. Needed for the Parquet w/o memory optimization. */ private final List> data; - - /* BDEC Parquet writer. It is used to buffer unflushed data in Parquet internal buffers instead of using Java objects */ - private BdecParquetWriter bdecParquetWriter; - - private ByteArrayOutputStream fileOutput; private final List> tempData; private MessageType schema; @@ -111,33 +103,10 @@ public void setupSchema(List columns) { id++; } schema = new MessageType(PARQUET_MESSAGE_TYPE_NAME, parquetTypes); - createFileWriter(); tempData.clear(); data.clear(); } - /** Create BDEC file writer if Parquet memory optimization is enabled. */ - private void createFileWriter() { - fileOutput = new ByteArrayOutputStream(); - try { - if (clientBufferParameters.getEnableParquetInternalBuffering()) { - bdecParquetWriter = - new BdecParquetWriter( - fileOutput, - schema, - metadata, - channelFullyQualifiedName, - clientBufferParameters.getMaxChunkSizeInBytes(), - clientBufferParameters.getBdecParquetCompression()); - } else { - this.bdecParquetWriter = null; - } - data.clear(); - } catch (IOException e) { - throw new SFException(ErrorCode.INTERNAL_ERROR, "cannot create parquet writer", e); - } - } - @Override boolean hasColumn(String name) { return fieldIndex.containsKey(name); @@ -154,11 +123,7 @@ float addRow( } void writeRow(List row) { - if (clientBufferParameters.getEnableParquetInternalBuffering()) { - bdecParquetWriter.writeRow(row); - } else { - data.add(row); - } + data.add(row); } @Override @@ -263,12 +228,10 @@ boolean hasColumns() { @Override Optional getSnapshot() { List> oldData = new ArrayList<>(); - if (!clientBufferParameters.getEnableParquetInternalBuffering()) { - data.forEach(r -> oldData.add(new ArrayList<>(r))); - } + data.forEach(r -> oldData.add(new ArrayList<>(r))); return bufferedRowCount <= 0 ? Optional.empty() - : Optional.of(new ParquetChunkData(oldData, bdecParquetWriter, fileOutput, metadata)); + : Optional.of(new ParquetChunkData(oldData, metadata)); } /** Used only for testing. */ @@ -309,27 +272,17 @@ int getTempRowCount() { @Override void reset() { super.reset(); - createFileWriter(); data.clear(); } /** Close the row buffer by releasing its internal resources. */ @Override - void closeInternal() { - if (bdecParquetWriter != null) { - try { - bdecParquetWriter.close(); - } catch (IOException e) { - throw new SFException(ErrorCode.INTERNAL_ERROR, "Failed to close parquet writer", e); - } - } - } + void closeInternal() {} @Override public Flusher createFlusher() { return new ParquetFlusher( schema, - clientBufferParameters.getEnableParquetInternalBuffering(), clientBufferParameters.getMaxChunkSizeInBytes(), clientBufferParameters.getBdecParquetCompression()); } diff --git a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java index 0525737a3..885314b01 100644 --- a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java +++ b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java @@ -30,8 +30,6 @@ public class ParameterProvider { public static final String BLOB_UPLOAD_MAX_RETRY_COUNT = "BLOB_UPLOAD_MAX_RETRY_COUNT".toLowerCase(); public static final String MAX_MEMORY_LIMIT_IN_BYTES = "MAX_MEMORY_LIMIT_IN_BYTES".toLowerCase(); - public static final String ENABLE_PARQUET_INTERNAL_BUFFERING = - "ENABLE_PARQUET_INTERNAL_BUFFERING".toLowerCase(); // This should not be needed once we have the ability to track size at table/chunk level public static final String MAX_CHANNEL_SIZE_IN_BYTES = "MAX_CHANNEL_SIZE_IN_BYTES".toLowerCase(); public static final String MAX_CHUNK_SIZE_IN_BYTES = "MAX_CHUNK_SIZE_IN_BYTES".toLowerCase(); @@ -79,10 +77,6 @@ public class ParameterProvider { /* Iceberg mode parameters: When streaming to Iceberg mode, different default parameters are required because it generates Parquet files instead of BDEC files. */ public static final int MAX_CHUNKS_IN_BLOB_ICEBERG_MODE_DEFAULT = 1; - /* Parameter that enables using internal Parquet buffers for buffering of rows before serializing. - It reduces memory consumption compared to using Java Objects for buffering.*/ - public static final boolean ENABLE_PARQUET_INTERNAL_BUFFERING_DEFAULT = false; - public static final boolean ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT = true; /** Map of parameter name to parameter value. This will be set by client/configure API Call. */ @@ -212,13 +206,6 @@ private void setParameterMap( props, false); - this.checkAndUpdate( - ENABLE_PARQUET_INTERNAL_BUFFERING, - ENABLE_PARQUET_INTERNAL_BUFFERING_DEFAULT, - parameterOverrides, - props, - false); - this.checkAndUpdate( MAX_CHANNEL_SIZE_IN_BYTES, MAX_CHANNEL_SIZE_IN_BYTES_DEFAULT, @@ -440,14 +427,6 @@ public long getMaxMemoryLimitInBytes() { return (val instanceof String) ? Long.parseLong(val.toString()) : (long) val; } - /** @return Return whether memory optimization for Parquet is enabled. */ - public boolean getEnableParquetInternalBuffering() { - Object val = - this.parameterMap.getOrDefault( - ENABLE_PARQUET_INTERNAL_BUFFERING, ENABLE_PARQUET_INTERNAL_BUFFERING_DEFAULT); - return (val instanceof String) ? Boolean.parseBoolean(val.toString()) : (boolean) val; - } - /** @return The max channel size in bytes */ public long getMaxChannelSizeInBytes() { Object val = diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java index 17d90f458..62a6b7a4b 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java @@ -35,12 +35,7 @@ public void testSerializationErrors() throws Exception { // Construction succeeds if both data and metadata contain 1 row BlobBuilder.constructBlobAndMetadata( "a.bdec", - Collections.singletonList(createChannelDataPerTable(1, false)), - Constants.BdecVersion.THREE, - encrypt); - BlobBuilder.constructBlobAndMetadata( - "a.bdec", - Collections.singletonList(createChannelDataPerTable(1, true)), + Collections.singletonList(createChannelDataPerTable(1)), Constants.BdecVersion.THREE, encrypt); @@ -48,13 +43,11 @@ public void testSerializationErrors() throws Exception { try { BlobBuilder.constructBlobAndMetadata( "a.bdec", - Collections.singletonList(createChannelDataPerTable(0, false)), + Collections.singletonList(createChannelDataPerTable(0)), Constants.BdecVersion.THREE, encrypt); - Assert.fail("Should not pass enableParquetInternalBuffering=false"); } catch (SFException e) { Assert.assertEquals(ErrorCode.INTERNAL_ERROR.getMessageCode(), e.getVendorCode()); - Assert.assertTrue(e.getMessage().contains("serializeFromJavaObjects")); Assert.assertTrue(e.getMessage().contains("parquetTotalRowsInFooter=1")); Assert.assertTrue(e.getMessage().contains("totalMetadataRowCount=0")); Assert.assertTrue(e.getMessage().contains("parquetTotalRowsWritten=1")); @@ -63,42 +56,18 @@ public void testSerializationErrors() throws Exception { Assert.assertTrue(e.getMessage().contains("channelsCountInMetadata=1")); Assert.assertTrue(e.getMessage().contains("countOfSerializedJavaObjects=1")); } - - try { - BlobBuilder.constructBlobAndMetadata( - "a.bdec", - Collections.singletonList(createChannelDataPerTable(0, true)), - Constants.BdecVersion.THREE, - encrypt); - Assert.fail("Should not pass enableParquetInternalBuffering=true"); - } catch (SFException e) { - Assert.assertEquals(ErrorCode.INTERNAL_ERROR.getMessageCode(), e.getVendorCode()); - Assert.assertTrue(e.getMessage().contains("serializeFromParquetWriteBuffers")); - Assert.assertTrue(e.getMessage().contains("parquetTotalRowsInFooter=1")); - Assert.assertTrue(e.getMessage().contains("totalMetadataRowCount=0")); - Assert.assertTrue(e.getMessage().contains("parquetTotalRowsWritten=1")); - Assert.assertTrue(e.getMessage().contains("perChannelRowCountsInMetadata=0")); - Assert.assertTrue(e.getMessage().contains("perBlockRowCountsInFooter=1")); - Assert.assertTrue(e.getMessage().contains("channelsCountInMetadata=1")); - Assert.assertTrue(e.getMessage().contains("countOfSerializedJavaObjects=-1")); - } } /** * Creates a channel data configurable number of rows in metadata and 1 physical row (using both * with and without internal buffering optimization) */ - private List> createChannelDataPerTable( - int metadataRowCount, boolean enableParquetInternalBuffering) throws IOException { + private List> createChannelDataPerTable(int metadataRowCount) + throws IOException { String columnName = "C1"; ChannelData channelData = Mockito.spy(new ChannelData<>()); MessageType schema = createSchema(columnName); - Mockito.doReturn( - new ParquetFlusher( - schema, - enableParquetInternalBuffering, - 100L, - Constants.BdecParquetCompression.GZIP)) + Mockito.doReturn(new ParquetFlusher(schema, 100L, Constants.BdecParquetCompression.GZIP)) .when(channelData) .createFlusher(); @@ -115,10 +84,7 @@ private List> createChannelDataPerTable( bdecParquetWriter.writeRow(Collections.singletonList("1")); channelData.setVectors( new ParquetChunkData( - Collections.singletonList(Collections.singletonList("A")), - bdecParquetWriter, - stream, - new HashMap<>())); + Collections.singletonList(Collections.singletonList("A")), new HashMap<>())); channelData.setColumnEps(new HashMap<>()); channelData.setRowCount(metadataRowCount); channelData.setMinMaxInsertTimeInMs(new Pair<>(2L, 3L)); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java index 3f8e927a4..f0a867075 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java @@ -31,16 +31,10 @@ import org.junit.Test; public class RowBufferTest { - - private final boolean enableParquetMemoryOptimization; private AbstractRowBuffer rowBufferOnErrorContinue; private AbstractRowBuffer rowBufferOnErrorAbort; private AbstractRowBuffer rowBufferOnErrorSkipBatch; - public RowBufferTest() { - this.enableParquetMemoryOptimization = false; - } - @Before public void setupRowBuffer() { this.rowBufferOnErrorContinue = createTestBuffer(OpenChannelRequest.OnErrorOption.CONTINUE); @@ -129,7 +123,6 @@ private AbstractRowBuffer createTestBuffer(OpenChannelRequest.OnErrorOption o rs -> {}, initialState, ClientBufferParameters.test_createClientBufferParameters( - enableParquetMemoryOptimization, MAX_CHUNK_SIZE_IN_BYTES_DEFAULT, MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT, Constants.BdecParquetCompression.GZIP,