diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetChunkData.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetChunkData.java index acaccb28b..e3fe97dfb 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetChunkData.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetChunkData.java @@ -4,7 +4,6 @@ package net.snowflake.ingest.streaming.internal; -import java.io.ByteArrayOutputStream; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -21,9 +20,7 @@ public class ParquetChunkData { * @param rows buffered row data as a list * @param metadata chunk metadata */ - public ParquetChunkData( - List> rows, - Map metadata) { + public ParquetChunkData(List> rows, Map metadata) { this.rows = rows; // create a defensive copy of the parameter map because the argument map passed here // may currently be shared across multiple threads. diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java index f3f2ad054..ddfca4a42 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java @@ -29,9 +29,7 @@ public class ParquetFlusher implements Flusher { private final Constants.BdecParquetCompression bdecParquetCompression; - /** - * Construct parquet flusher from its schema. - */ + /** Construct parquet flusher from its schema. */ public ParquetFlusher( MessageType schema, long maxChunkSizeInBytes, @@ -43,7 +41,7 @@ public ParquetFlusher( @Override public SerializationResult serialize( - List> channelsDataPerTable, String filePath) + List> channelsDataPerTable, String filePath) throws IOException { return serializeFromJavaObjects(channelsDataPerTable, filePath); } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java index 90a75096c..5ada286e5 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java @@ -4,8 +4,6 @@ package net.snowflake.ingest.streaming.internal; -import java.io.ByteArrayOutputStream; -import java.io.IOException; import java.math.BigDecimal; import java.math.BigInteger; import java.nio.charset.StandardCharsets; @@ -24,7 +22,6 @@ import net.snowflake.ingest.streaming.OpenChannelRequest; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.SFException; -import org.apache.parquet.hadoop.BdecParquetWriter; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; @@ -126,7 +123,7 @@ float addRow( } void writeRow(List row) { - data.add(row); + data.add(row); } @Override @@ -231,7 +228,7 @@ boolean hasColumns() { @Override Optional getSnapshot() { List> oldData = new ArrayList<>(); - data.forEach(r -> oldData.add(new ArrayList<>(r))); + data.forEach(r -> oldData.add(new ArrayList<>(r))); return bufferedRowCount <= 0 ? Optional.empty() : Optional.of(new ParquetChunkData(oldData, metadata)); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java index 8e65f0382..62a6b7a4b 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/BlobBuilderTest.java @@ -62,16 +62,12 @@ public void testSerializationErrors() throws Exception { * Creates a channel data configurable number of rows in metadata and 1 physical row (using both * with and without internal buffering optimization) */ - private List> createChannelDataPerTable( - int metadataRowCount) throws IOException { + private List> createChannelDataPerTable(int metadataRowCount) + throws IOException { String columnName = "C1"; ChannelData channelData = Mockito.spy(new ChannelData<>()); MessageType schema = createSchema(columnName); - Mockito.doReturn( - new ParquetFlusher( - schema, - 100L, - Constants.BdecParquetCompression.GZIP)) + Mockito.doReturn(new ParquetFlusher(schema, 100L, Constants.BdecParquetCompression.GZIP)) .when(channelData) .createFlusher(); @@ -88,8 +84,7 @@ private List> createChannelDataPerTable( bdecParquetWriter.writeRow(Collections.singletonList("1")); channelData.setVectors( new ParquetChunkData( - Collections.singletonList(Collections.singletonList("A")), - new HashMap<>())); + Collections.singletonList(Collections.singletonList("A")), new HashMap<>())); channelData.setColumnEps(new HashMap<>()); channelData.setRowCount(metadataRowCount); channelData.setMinMaxInsertTimeInMs(new Pair<>(2L, 3L));