From 570894c9ac287abc22f8243594847f80cafa4bae Mon Sep 17 00:00:00 2001 From: Toby Zhang Date: Tue, 17 Oct 2023 12:47:55 -0700 Subject: [PATCH 01/10] NO-SNOW: Fix two issues in insertRows API (#602) This PR contains two fixes in the current insertRows logic: - We can't thrown any exception when ON_ERROR=CONTINUE, since it will cause duplicate data if we insert rows into the buffer but not updating the offset token - Update the row buffer only if the rows are inserted successfully for ON_ERROR=SKIP_BATCH --- .../streaming/internal/AbstractRowBuffer.java | 25 +++++++++---------- .../streaming/internal/RowBufferTest.java | 1 - 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java index 30be56fcb..3029e7a49 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java @@ -150,6 +150,9 @@ public InsertValidationResponse insertRows( InsertValidationResponse.InsertError error = new InsertValidationResponse.InsertError(row, rowIndex); try { + if (rowBuffer.bufferedRowCount == Integer.MAX_VALUE) { + throw new SFException(ErrorCode.INTERNAL_ERROR, "Row count reaches MAX value"); + } Set inputColumnNames = verifyInputColumns(row, error, rowIndex); rowsSizeInBytes += addRow( @@ -163,11 +166,7 @@ public InsertValidationResponse insertRows( error.setException(new SFException(e, ErrorCode.INTERNAL_ERROR, e.getMessage())); response.addError(error); } - checkBatchSizeEnforcedMaximum(rowsSizeInBytes); rowIndex++; - if (rowBuffer.bufferedRowCount == Integer.MAX_VALUE) { - throw new SFException(ErrorCode.INTERNAL_ERROR, "Row count reaches MAX value"); - } } checkBatchSizeRecommendedMaximum(rowsSizeInBytes); rowBuffer.channelState.setOffsetToken(offsetToken); @@ -191,17 +190,17 @@ public InsertValidationResponse insertRows( Set inputColumnNames = verifyInputColumns(row, null, tempRowCount); tempRowsSizeInBytes += addTempRow(row, tempRowCount, rowBuffer.tempStatsMap, inputColumnNames, tempRowCount); - checkBatchSizeEnforcedMaximum(tempRowsSizeInBytes); tempRowCount++; + checkBatchSizeEnforcedMaximum(tempRowsSizeInBytes); + if ((long) rowBuffer.bufferedRowCount + tempRowCount >= Integer.MAX_VALUE) { + throw new SFException(ErrorCode.INTERNAL_ERROR, "Row count reaches MAX value"); + } } checkBatchSizeRecommendedMaximum(tempRowsSizeInBytes); moveTempRowsToActualBuffer(tempRowCount); rowsSizeInBytes = tempRowsSizeInBytes; - if ((long) rowBuffer.bufferedRowCount + tempRowCount >= Integer.MAX_VALUE) { - throw new SFException(ErrorCode.INTERNAL_ERROR, "Row count reaches MAX value"); - } rowBuffer.bufferedRowCount += tempRowCount; rowBuffer.statsMap.forEach( (colName, stats) -> @@ -241,15 +240,15 @@ public InsertValidationResponse insertRows( response.addError(error); } checkBatchSizeEnforcedMaximum(tempRowsSizeInBytes); + if ((long) rowBuffer.bufferedRowCount + tempRowCount >= Integer.MAX_VALUE) { + throw new SFException(ErrorCode.INTERNAL_ERROR, "Row count reaches MAX value"); + } } if (!response.hasErrors()) { checkBatchSizeRecommendedMaximum(tempRowsSizeInBytes); moveTempRowsToActualBuffer(tempRowCount); rowsSizeInBytes = tempRowsSizeInBytes; - if ((long) rowBuffer.bufferedRowCount + tempRowCount >= Integer.MAX_VALUE) { - throw new SFException(ErrorCode.INTERNAL_ERROR, "Row count reaches MAX value"); - } rowBuffer.bufferedRowCount += tempRowCount; rowBuffer.statsMap.forEach( (colName, stats) -> @@ -257,9 +256,9 @@ public InsertValidationResponse insertRows( colName, RowBufferStats.getCombinedStats(stats, rowBuffer.tempStatsMap.get(colName)))); rowBuffer.channelState.setOffsetToken(offsetToken); + rowBuffer.bufferSize += rowsSizeInBytes; + rowBuffer.rowSizeMetric.accept(rowsSizeInBytes); } - rowBuffer.bufferSize += rowsSizeInBytes; - rowBuffer.rowSizeMetric.accept(rowsSizeInBytes); return response; } } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java index 549eefbde..269b6f28d 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java @@ -945,7 +945,6 @@ private void testE2ETimeHelper(OpenChannelRequest.OnErrorOption onErrorOption) { @Test public void testMaxInsertRowsBatchSize() { - testMaxInsertRowsBatchSizeHelper(OpenChannelRequest.OnErrorOption.CONTINUE); testMaxInsertRowsBatchSizeHelper(OpenChannelRequest.OnErrorOption.ABORT); testMaxInsertRowsBatchSizeHelper(OpenChannelRequest.OnErrorOption.SKIP_BATCH); } From 5aa45d3c9e8b702a2f63e570e307c00bb238e7b7 Mon Sep 17 00:00:00 2001 From: Lukas Sembera Date: Wed, 18 Oct 2023 13:22:58 +0200 Subject: [PATCH 02/10] SNOW-902709 Limit the max allowed number of chunks in blob (#580) --- .../streaming/internal/FlushService.java | 12 ++ ...nowflakeStreamingIngestClientInternal.java | 48 ++++- .../ingest/utils/ParameterProvider.java | 21 ++ .../streaming/internal/FlushServiceTest.java | 119 +++++++++++- .../streaming/internal/ManyTablesIT.java | 98 ++++++++++ .../internal/ParameterProviderTest.java | 9 + .../SnowflakeStreamingIngestClientTest.java | 180 +++++++++++++----- 7 files changed, 439 insertions(+), 48 deletions(-) create mode 100644 src/test/java/net/snowflake/ingest/streaming/internal/ManyTablesIT.java diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java index ff879b8d6..cb1ef7810 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java @@ -350,6 +350,18 @@ void distributeFlushTasks() { if (!leftoverChannelsDataPerTable.isEmpty()) { channelsDataPerTable.addAll(leftoverChannelsDataPerTable); leftoverChannelsDataPerTable.clear(); + } else if (blobData.size() + >= this.owningClient + .getParameterProvider() + .getMaxChunksInBlobAndRegistrationRequest()) { + // Create a new blob if the current one already contains max allowed number of chunks + logger.logInfo( + "Max allowed number of chunks in the current blob reached. chunkCount={}" + + " maxChunkCount={} currentBlobPath={}", + blobData.size(), + this.owningClient.getParameterProvider().getMaxChunksInBlobAndRegistrationRequest(), + blobPath); + break; } else { ConcurrentHashMap> table = itr.next().getValue(); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index 5916e472d..27ff9407e 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -452,7 +452,53 @@ ChannelsStatusResponse getChannelsStatus( * @param blobs list of uploaded blobs */ void registerBlobs(List blobs) { - this.registerBlobs(blobs, 0); + for (List blobBatch : partitionBlobListForRegistrationRequest(blobs)) { + this.registerBlobs(blobBatch, 0); + } + } + + /** + * Partition the collection of blobs into sub-lists, so that the total number of chunks in each + * sublist does not exceed the max allowed number of chunks in one registration request. + */ + List> partitionBlobListForRegistrationRequest(List blobs) { + List> result = new ArrayList<>(); + List currentBatch = new ArrayList<>(); + int chunksInCurrentBatch = 0; + int maxChunksInBlobAndRegistrationRequest = + parameterProvider.getMaxChunksInBlobAndRegistrationRequest(); + + for (BlobMetadata blob : blobs) { + if (blob.getChunks().size() > maxChunksInBlobAndRegistrationRequest) { + throw new SFException( + ErrorCode.INTERNAL_ERROR, + String.format( + "Incorrectly generated blob detected - number of chunks in the blob is larger than" + + " the max allowed number of chunks. Please report this bug to Snowflake." + + " bdec=%s chunkCount=%d maxAllowedChunkCount=%d", + blob.getPath(), blob.getChunks().size(), maxChunksInBlobAndRegistrationRequest)); + } + + if (chunksInCurrentBatch + blob.getChunks().size() > maxChunksInBlobAndRegistrationRequest) { + // Newly added BDEC file would exceed the max number of chunks in a single registration + // request. We put chunks collected so far into the result list and create a new batch with + // the current blob + result.add(currentBatch); + currentBatch = new ArrayList<>(); + currentBatch.add(blob); + chunksInCurrentBatch = blob.getChunks().size(); + } else { + // Newly added BDEC can be added to the current batch because it does not exceed the max + // number of chunks in a single registration request, yet. + currentBatch.add(blob); + chunksInCurrentBatch += blob.getChunks().size(); + } + } + + if (!currentBatch.isEmpty()) { + result.add(currentBatch); + } + return result; } /** diff --git a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java index 5c6f81f66..3a2221697 100644 --- a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java +++ b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java @@ -31,6 +31,8 @@ public class ParameterProvider { public static final String MAX_CHUNK_SIZE_IN_BYTES = "MAX_CHUNK_SIZE_IN_BYTES".toLowerCase(); public static final String MAX_ALLOWED_ROW_SIZE_IN_BYTES = "MAX_ALLOWED_ROW_SIZE_IN_BYTES".toLowerCase(); + public static final String MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST = + "MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST".toLowerCase(); public static final String MAX_CLIENT_LAG = "MAX_CLIENT_LAG".toLowerCase(); @@ -59,6 +61,7 @@ public class ParameterProvider { static final long MAX_CLIENT_LAG_MS_MAX = TimeUnit.MINUTES.toMillis(10); public static final long MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT = 64 * 1024 * 1024; // 64 MB + public static final int MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT = 100; /* Parameter that enables using internal Parquet buffers for buffering of rows before serializing. It reduces memory consumption compared to using Java Objects for buffering.*/ @@ -170,6 +173,11 @@ private void setParameterMap(Map parameterOverrides, Properties this.updateValue(MAX_CLIENT_LAG, MAX_CLIENT_LAG_DEFAULT, parameterOverrides, props); this.updateValue( MAX_CLIENT_LAG_ENABLED, MAX_CLIENT_LAG_ENABLED_DEFAULT, parameterOverrides, props); + this.updateValue( + MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST, + MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT, + parameterOverrides, + props); } /** @return Longest interval in milliseconds between buffer flushes */ @@ -369,6 +377,7 @@ public long getMaxChunkSizeInBytes() { return (val instanceof String) ? Long.parseLong(val.toString()) : (long) val; } + /** @return The max allow row size (in bytes) */ public long getMaxAllowedRowSizeInBytes() { Object val = this.parameterMap.getOrDefault( @@ -376,6 +385,18 @@ public long getMaxAllowedRowSizeInBytes() { return (val instanceof String) ? Long.parseLong(val.toString()) : (long) val; } + /** + * @return The max number of chunks that can be put into a single BDEC or blob registration + * request. + */ + public int getMaxChunksInBlobAndRegistrationRequest() { + Object val = + this.parameterMap.getOrDefault( + MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST, + MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT); + return (val instanceof String) ? Integer.parseInt(val.toString()) : (int) val; + } + @Override public String toString() { return "ParameterProvider{" + "parameterMap=" + parameterMap + '}'; diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index 5a77a51db..17f929206 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -35,6 +35,7 @@ import java.util.List; import java.util.Map; import java.util.TimeZone; +import java.util.UUID; import java.util.concurrent.TimeUnit; import javax.crypto.BadPaddingException; import javax.crypto.IllegalBlockSizeException; @@ -273,7 +274,22 @@ TestContext>> create() { } } - TestContextFactory testContextFactory; + TestContextFactory>> testContextFactory; + + private SnowflakeStreamingIngestChannelInternal>> addChannel( + TestContext>> testContext, int tableId, long encryptionKeyId) { + return testContext + .channelBuilder("channel" + UUID.randomUUID()) + .setDBName("db1") + .setSchemaName("PUBLIC") + .setTableName("table" + tableId) + .setOffsetToken("offset1") + .setChannelSequencer(0L) + .setRowSequencer(0L) + .setEncryptionKey("key") + .setEncryptionKeyId(encryptionKeyId) + .buildAndAdd(); + } private SnowflakeStreamingIngestChannelInternal addChannel1(TestContext testContext) { return testContext @@ -546,6 +562,107 @@ public void testBlobSplitDueToChunkSizeLimit() throws Exception { Mockito.verify(flushService, Mockito.times(2)).buildAndUpload(Mockito.any(), Mockito.any()); } + @Test + public void testBlobSplitDueToNumberOfChunks() throws Exception { + for (int rowCount : Arrays.asList(0, 1, 30, 111, 159, 287, 1287, 1599, 4496)) { + runTestBlobSplitDueToNumberOfChunks(rowCount); + } + } + + /** + * Insert rows in batches of 3 into each table and assert that the expected number of blobs is + * generated. + * + * @param numberOfRows How many rows to insert + */ + public void runTestBlobSplitDueToNumberOfChunks(int numberOfRows) throws Exception { + int channelsPerTable = 3; + int expectedBlobs = + (int) + Math.ceil( + (double) numberOfRows + / channelsPerTable + / ParameterProvider.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT); + + final TestContext>> testContext = testContextFactory.create(); + + for (int i = 0; i < numberOfRows; i++) { + SnowflakeStreamingIngestChannelInternal>> channel = + addChannel(testContext, i / channelsPerTable, 1); + channel.setupSchema(Collections.singletonList(createLargeTestTextColumn("C1"))); + channel.insertRow(Collections.singletonMap("C1", i), ""); + } + + FlushService>> flushService = testContext.flushService; + flushService.flush(true).get(); + + ArgumentCaptor>>>>> blobDataCaptor = + ArgumentCaptor.forClass(List.class); + Mockito.verify(flushService, Mockito.times(expectedBlobs)) + .buildAndUpload(Mockito.any(), blobDataCaptor.capture()); + + // 1. list => blobs; 2. list => chunks; 3. list => channels; 4. list => rows, 5. list => columns + List>>>>> allUploadedBlobs = + blobDataCaptor.getAllValues(); + + Assert.assertEquals(numberOfRows, getRows(allUploadedBlobs).size()); + } + + @Test + public void testBlobSplitDueToNumberOfChunksWithLeftoverChannels() throws Exception { + final TestContext>> testContext = testContextFactory.create(); + + for (int i = 0; i < 99; i++) { // 19 simple chunks + SnowflakeStreamingIngestChannelInternal>> channel = + addChannel(testContext, i, 1); + channel.setupSchema(Collections.singletonList(createLargeTestTextColumn("C1"))); + channel.insertRow(Collections.singletonMap("C1", i), ""); + } + + // 20th chunk would contain multiple channels, but there are some with different encryption key + // ID, so they spill to a new blob + SnowflakeStreamingIngestChannelInternal>> channel1 = + addChannel(testContext, 99, 1); + channel1.setupSchema(Collections.singletonList(createLargeTestTextColumn("C1"))); + channel1.insertRow(Collections.singletonMap("C1", 0), ""); + + SnowflakeStreamingIngestChannelInternal>> channel2 = + addChannel(testContext, 99, 2); + channel2.setupSchema(Collections.singletonList(createLargeTestTextColumn("C1"))); + channel2.insertRow(Collections.singletonMap("C1", 0), ""); + + SnowflakeStreamingIngestChannelInternal>> channel3 = + addChannel(testContext, 99, 2); + channel3.setupSchema(Collections.singletonList(createLargeTestTextColumn("C1"))); + channel3.insertRow(Collections.singletonMap("C1", 0), ""); + + FlushService>> flushService = testContext.flushService; + flushService.flush(true).get(); + + ArgumentCaptor>>>>> blobDataCaptor = + ArgumentCaptor.forClass(List.class); + Mockito.verify(flushService, Mockito.atLeast(2)) + .buildAndUpload(Mockito.any(), blobDataCaptor.capture()); + + // 1. list => blobs; 2. list => chunks; 3. list => channels; 4. list => rows, 5. list => columns + List>>>>> allUploadedBlobs = + blobDataCaptor.getAllValues(); + + Assert.assertEquals(102, getRows(allUploadedBlobs).size()); + } + + private List> getRows(List>>>>> blobs) { + List> result = new ArrayList<>(); + blobs.forEach( + chunks -> + chunks.forEach( + channels -> + channels.forEach( + chunkData -> + result.addAll(((ParquetChunkData) chunkData.getVectors()).rows)))); + return result; + } + @Test public void testBuildAndUpload() throws Exception { long expectedBuildLatencyMs = 100; diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ManyTablesIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/ManyTablesIT.java new file mode 100644 index 000000000..d32adfe3f --- /dev/null +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ManyTablesIT.java @@ -0,0 +1,98 @@ +package net.snowflake.ingest.streaming.internal; + +import static net.snowflake.ingest.utils.Constants.ROLE; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Collections; +import java.util.Map; +import java.util.Properties; +import net.snowflake.ingest.TestUtils; +import net.snowflake.ingest.streaming.OpenChannelRequest; +import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel; +import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient; +import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClientFactory; +import net.snowflake.ingest.utils.Constants; +import net.snowflake.ingest.utils.ParameterProvider; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Verified that ingestion work when we ingest into large number of tables from the same client and + * blobs and registration requests have to be cut, so they don't contain large number of chunks + */ +public class ManyTablesIT { + + private static final int TABLES_COUNT = 20; + private static final int TOTAL_ROWS_COUNT = 200_000; + private String dbName; + private SnowflakeStreamingIngestClient client; + private Connection connection; + private SnowflakeStreamingIngestChannel[] channels; + private String[] offsetTokensPerChannel; + + @Before + public void setUp() throws Exception { + Properties props = TestUtils.getProperties(Constants.BdecVersion.THREE, false); + props.put(ParameterProvider.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST, 2); + if (props.getProperty(ROLE).equals("DEFAULT_ROLE")) { + props.setProperty(ROLE, "ACCOUNTADMIN"); + } + client = SnowflakeStreamingIngestClientFactory.builder("client1").setProperties(props).build(); + connection = TestUtils.getConnection(true); + dbName = String.format("sdk_it_many_tables_db_%d", System.nanoTime()); + + channels = new SnowflakeStreamingIngestChannel[TABLES_COUNT]; + offsetTokensPerChannel = new String[TABLES_COUNT]; + connection.createStatement().execute(String.format("create database %s;", dbName)); + + String[] tableNames = new String[TABLES_COUNT]; + for (int i = 0; i < tableNames.length; i++) { + tableNames[i] = String.format("table_%d", i); + connection.createStatement().execute(String.format("create table table_%d(c int);", i)); + channels[i] = + client.openChannel( + OpenChannelRequest.builder(String.format("channel-%d", i)) + .setDBName(dbName) + .setSchemaName("public") + .setTableName(tableNames[i]) + .setOnErrorOption(OpenChannelRequest.OnErrorOption.ABORT) + .build()); + } + } + + @After + public void tearDown() throws Exception { + connection.createStatement().execute(String.format("drop database %s;", dbName)); + client.close(); + connection.close(); + } + + @Test + public void testIngestionIntoManyTables() throws InterruptedException, SQLException { + for (int i = 0; i < TOTAL_ROWS_COUNT; i++) { + Map row = Collections.singletonMap("c", i); + String offset = String.valueOf(i); + int channelId = i % channels.length; + channels[channelId].insertRow(row, offset); + offsetTokensPerChannel[channelId] = offset; + } + + for (int i = 0; i < channels.length; i++) { + TestUtils.waitForOffset(channels[i], offsetTokensPerChannel[i]); + } + + int totalRowsCount = 0; + ResultSet rs = + connection + .createStatement() + .executeQuery(String.format("show tables in database %s;", dbName)); + while (rs.next()) { + totalRowsCount += rs.getInt("rows"); + } + Assert.assertEquals(TOTAL_ROWS_COUNT, totalRowsCount); + } +} diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java index def5f7ecf..7838763de 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java @@ -272,4 +272,13 @@ public void testMaxClientLagEnabledThresholdAbove() { Assert.assertTrue(e.getMessage().startsWith("Lag falls outside")); } } + + @Test + public void testMaxChunksInBlobAndRegistrationRequest() { + Properties prop = new Properties(); + Map parameterMap = getStartingParameterMap(); + parameterMap.put("max_chunks_in_blob_and_registration_request", 1); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + Assert.assertEquals(1, parameterProvider.getMaxChunksInBlobAndRegistrationRequest()); + } } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java index a99054c9f..11fc0b93b 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java @@ -11,6 +11,8 @@ import static net.snowflake.ingest.utils.Constants.ROLE; import static net.snowflake.ingest.utils.Constants.USER; import static net.snowflake.ingest.utils.ParameterProvider.ENABLE_SNOWPIPE_STREAMING_METRICS; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.when; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; @@ -319,11 +321,11 @@ public void testGetChannelsStatusWithRequest() throws Exception { CloseableHttpResponse httpResponse = Mockito.mock(CloseableHttpResponse.class); StatusLine statusLine = Mockito.mock(StatusLine.class); HttpEntity httpEntity = Mockito.mock(HttpEntity.class); - Mockito.when(statusLine.getStatusCode()).thenReturn(200); - Mockito.when(httpResponse.getStatusLine()).thenReturn(statusLine); - Mockito.when(httpResponse.getEntity()).thenReturn(httpEntity); - Mockito.when(httpEntity.getContent()).thenReturn(IOUtils.toInputStream(responseString)); - Mockito.when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); + when(statusLine.getStatusCode()).thenReturn(200); + when(httpResponse.getStatusLine()).thenReturn(statusLine); + when(httpResponse.getEntity()).thenReturn(httpEntity); + when(httpEntity.getContent()).thenReturn(IOUtils.toInputStream(responseString)); + when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); RequestBuilder requestBuilder = Mockito.spy( @@ -378,11 +380,11 @@ public void testGetChannelsStatusWithRequestError() throws Exception { CloseableHttpResponse httpResponse = Mockito.mock(CloseableHttpResponse.class); StatusLine statusLine = Mockito.mock(StatusLine.class); HttpEntity httpEntity = Mockito.mock(HttpEntity.class); - Mockito.when(statusLine.getStatusCode()).thenReturn(500); - Mockito.when(httpResponse.getStatusLine()).thenReturn(statusLine); - Mockito.when(httpResponse.getEntity()).thenReturn(httpEntity); - Mockito.when(httpEntity.getContent()).thenReturn(IOUtils.toInputStream(responseString)); - Mockito.when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); + when(statusLine.getStatusCode()).thenReturn(500); + when(httpResponse.getStatusLine()).thenReturn(statusLine); + when(httpResponse.getEntity()).thenReturn(httpEntity); + when(httpEntity.getContent()).thenReturn(IOUtils.toInputStream(responseString)); + when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); RequestBuilder requestBuilder = Mockito.spy( @@ -645,12 +647,12 @@ public void testRegisterBlobErrorResponse() throws Exception { CloseableHttpResponse httpResponse = Mockito.mock(CloseableHttpResponse.class); StatusLine statusLine = Mockito.mock(StatusLine.class); HttpEntity httpEntity = Mockito.mock(HttpEntity.class); - Mockito.when(statusLine.getStatusCode()).thenReturn(500); - Mockito.when(httpResponse.getStatusLine()).thenReturn(statusLine); - Mockito.when(httpResponse.getEntity()).thenReturn(httpEntity); + when(statusLine.getStatusCode()).thenReturn(500); + when(httpResponse.getStatusLine()).thenReturn(statusLine); + when(httpResponse.getEntity()).thenReturn(httpEntity); String response = "testRegisterBlobErrorResponse"; - Mockito.when(httpEntity.getContent()).thenReturn(IOUtils.toInputStream(response)); - Mockito.when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); + when(httpEntity.getContent()).thenReturn(IOUtils.toInputStream(response)); + when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); RequestBuilder requestBuilder = new RequestBuilder(TestUtils.getHost(), TestUtils.getUser(), TestUtils.getKeyPair()); @@ -693,11 +695,11 @@ public void testRegisterBlobSnowflakeInternalErrorResponse() throws Exception { CloseableHttpResponse httpResponse = Mockito.mock(CloseableHttpResponse.class); StatusLine statusLine = Mockito.mock(StatusLine.class); HttpEntity httpEntity = Mockito.mock(HttpEntity.class); - Mockito.when(statusLine.getStatusCode()).thenReturn(200); - Mockito.when(httpResponse.getStatusLine()).thenReturn(statusLine); - Mockito.when(httpResponse.getEntity()).thenReturn(httpEntity); - Mockito.when(httpEntity.getContent()).thenReturn(IOUtils.toInputStream(response)); - Mockito.when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); + when(statusLine.getStatusCode()).thenReturn(200); + when(httpResponse.getStatusLine()).thenReturn(statusLine); + when(httpResponse.getEntity()).thenReturn(httpEntity); + when(httpEntity.getContent()).thenReturn(IOUtils.toInputStream(response)); + when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); RequestBuilder requestBuilder = new RequestBuilder(TestUtils.getHost(), TestUtils.getUser(), TestUtils.getKeyPair()); @@ -749,11 +751,11 @@ public void testRegisterBlobSuccessResponse() throws Exception { CloseableHttpResponse httpResponse = Mockito.mock(CloseableHttpResponse.class); StatusLine statusLine = Mockito.mock(StatusLine.class); HttpEntity httpEntity = Mockito.mock(HttpEntity.class); - Mockito.when(statusLine.getStatusCode()).thenReturn(200); - Mockito.when(httpResponse.getStatusLine()).thenReturn(statusLine); - Mockito.when(httpResponse.getEntity()).thenReturn(httpEntity); - Mockito.when(httpEntity.getContent()).thenReturn(IOUtils.toInputStream(response)); - Mockito.when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); + when(statusLine.getStatusCode()).thenReturn(200); + when(httpResponse.getStatusLine()).thenReturn(statusLine); + when(httpResponse.getEntity()).thenReturn(httpEntity); + when(httpEntity.getContent()).thenReturn(IOUtils.toInputStream(response)); + when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); RequestBuilder requestBuilder = new RequestBuilder(TestUtils.getHost(), TestUtils.getUser(), TestUtils.getKeyPair()); @@ -827,16 +829,16 @@ public void testRegisterBlobsRetries() throws Exception { CloseableHttpResponse httpResponse = Mockito.mock(CloseableHttpResponse.class); StatusLine statusLine = Mockito.mock(StatusLine.class); HttpEntity httpEntity = Mockito.mock(HttpEntity.class); - Mockito.when(statusLine.getStatusCode()).thenReturn(200); - Mockito.when(httpResponse.getStatusLine()).thenReturn(statusLine); - Mockito.when(httpResponse.getEntity()).thenReturn(httpEntity); - Mockito.when(httpEntity.getContent()) + when(statusLine.getStatusCode()).thenReturn(200); + when(httpResponse.getStatusLine()).thenReturn(statusLine); + when(httpResponse.getEntity()).thenReturn(httpEntity); + when(httpEntity.getContent()) .thenReturn( IOUtils.toInputStream(responseString), IOUtils.toInputStream(retryResponseString), IOUtils.toInputStream(retryResponseString), IOUtils.toInputStream(retryResponseString)); - Mockito.when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); + when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); RequestBuilder requestBuilder = Mockito.spy( @@ -862,6 +864,92 @@ public void testRegisterBlobsRetries() throws Exception { Assert.assertFalse(channel2.isValid()); } + @Test + public void testRegisterBlobChunkLimit() throws Exception { + CloseableHttpClient httpClient = Mockito.mock(CloseableHttpClient.class); + RequestBuilder requestBuilder = + Mockito.spy( + new RequestBuilder(TestUtils.getHost(), TestUtils.getUser(), TestUtils.getKeyPair())); + + SnowflakeStreamingIngestClientInternal client = + Mockito.spy( + new SnowflakeStreamingIngestClientInternal<>( + "client", + new SnowflakeURL("snowflake.dev.local:8082"), + null, + httpClient, + true, + requestBuilder, + null)); + + assertEquals(0, client.partitionBlobListForRegistrationRequest(new ArrayList<>()).size()); + assertEquals( + 1, client.partitionBlobListForRegistrationRequest(createTestBlobMetadata(1)).size()); + assertEquals( + 1, client.partitionBlobListForRegistrationRequest(createTestBlobMetadata(99)).size()); + assertEquals( + 1, client.partitionBlobListForRegistrationRequest(createTestBlobMetadata(100)).size()); + + assertEquals( + 1, client.partitionBlobListForRegistrationRequest(createTestBlobMetadata(3, 95, 2)).size()); + assertEquals( + 2, + client.partitionBlobListForRegistrationRequest(createTestBlobMetadata(3, 95, 2, 1)).size()); + assertEquals( + 3, + client + .partitionBlobListForRegistrationRequest(createTestBlobMetadata(3, 95, 2, 1, 100)) + .size()); + assertEquals( + 2, client.partitionBlobListForRegistrationRequest(createTestBlobMetadata(99, 2)).size()); + assertEquals( + 2, + client + .partitionBlobListForRegistrationRequest(createTestBlobMetadata(55, 44, 2, 98)) + .size()); + assertEquals( + 3, + client + .partitionBlobListForRegistrationRequest(createTestBlobMetadata(55, 44, 2, 99)) + .size()); + assertEquals( + 3, + client + .partitionBlobListForRegistrationRequest(createTestBlobMetadata(55, 44, 2, 99, 1)) + .size()); + } + + /** + * Generate blob metadata with specified number of chunks per blob + * + * @param numbersOfChunks Array of chunk numbers per blob + * @return List of blob metadata + */ + private List createTestBlobMetadata(int... numbersOfChunks) { + List result = new ArrayList<>(); + for (int n : numbersOfChunks) { + List chunkMetadata = new ArrayList<>(); + for (int i = 0; i < n; i++) { + ChunkMetadata chunk = + ChunkMetadata.builder() + .setOwningTableFromChannelContext(channel1.getChannelContext()) + .setChunkStartOffset(0L) + .setChunkLength(1) + .setEncryptionKeyId(0L) + .setChunkMD5("") + .setEpInfo(new EpInfo()) + .setChannelList(new ArrayList<>()) + .setFirstInsertTimeInMs(0L) + .setLastInsertTimeInMs(0L) + .build(); + chunkMetadata.add(chunk); + } + + result.add(new BlobMetadata("", "", chunkMetadata, new BlobStats())); + } + return result; + } + @Test public void testRegisterBlobsRetriesSucceeds() throws Exception { Pair, Set> testData = getRetryBlobMetadata(); @@ -945,13 +1033,13 @@ public void testRegisterBlobsRetriesSucceeds() throws Exception { CloseableHttpResponse httpResponse = Mockito.mock(CloseableHttpResponse.class); StatusLine statusLine = Mockito.mock(StatusLine.class); HttpEntity httpEntity = Mockito.mock(HttpEntity.class); - Mockito.when(statusLine.getStatusCode()).thenReturn(200); - Mockito.when(httpResponse.getStatusLine()).thenReturn(statusLine); - Mockito.when(httpResponse.getEntity()).thenReturn(httpEntity); - Mockito.when(httpEntity.getContent()) + when(statusLine.getStatusCode()).thenReturn(200); + when(httpResponse.getStatusLine()).thenReturn(statusLine); + when(httpResponse.getEntity()).thenReturn(httpEntity); + when(httpEntity.getContent()) .thenReturn( IOUtils.toInputStream(responseString), IOUtils.toInputStream(retryResponseString)); - Mockito.when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); + when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); RequestBuilder requestBuilder = Mockito.spy( @@ -1022,11 +1110,11 @@ public void testRegisterBlobResponseWithInvalidChannel() throws Exception { CloseableHttpResponse httpResponse = Mockito.mock(CloseableHttpResponse.class); StatusLine statusLine = Mockito.mock(StatusLine.class); HttpEntity httpEntity = Mockito.mock(HttpEntity.class); - Mockito.when(statusLine.getStatusCode()).thenReturn(200); - Mockito.when(httpResponse.getStatusLine()).thenReturn(statusLine); - Mockito.when(httpResponse.getEntity()).thenReturn(httpEntity); - Mockito.when(httpEntity.getContent()).thenReturn(IOUtils.toInputStream(response)); - Mockito.when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); + when(statusLine.getStatusCode()).thenReturn(200); + when(httpResponse.getStatusLine()).thenReturn(statusLine); + when(httpResponse.getEntity()).thenReturn(httpEntity); + when(httpEntity.getContent()).thenReturn(IOUtils.toInputStream(response)); + when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); RequestBuilder requestBuilder = new RequestBuilder(TestUtils.getHost(), TestUtils.getUser(), TestUtils.getKeyPair()); @@ -1146,7 +1234,7 @@ public void testCloseWithError() throws Exception { CompletableFuture future = new CompletableFuture<>(); future.completeExceptionally(new Exception("Simulating Error")); - Mockito.when(client.flush(true)).thenReturn(future); + when(client.flush(true)).thenReturn(future); Assert.assertFalse(client.isClosed()); try { @@ -1249,11 +1337,11 @@ public void testGetLatestCommittedOffsetTokens() throws Exception { CloseableHttpResponse httpResponse = Mockito.mock(CloseableHttpResponse.class); StatusLine statusLine = Mockito.mock(StatusLine.class); HttpEntity httpEntity = Mockito.mock(HttpEntity.class); - Mockito.when(statusLine.getStatusCode()).thenReturn(200); - Mockito.when(httpResponse.getStatusLine()).thenReturn(statusLine); - Mockito.when(httpResponse.getEntity()).thenReturn(httpEntity); - Mockito.when(httpEntity.getContent()).thenReturn(IOUtils.toInputStream(responseString)); - Mockito.when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); + when(statusLine.getStatusCode()).thenReturn(200); + when(httpResponse.getStatusLine()).thenReturn(statusLine); + when(httpResponse.getEntity()).thenReturn(httpEntity); + when(httpEntity.getContent()).thenReturn(IOUtils.toInputStream(responseString)); + when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); RequestBuilder requestBuilder = Mockito.spy( From 72ff74777fb476f8327b1245d2ff50a75e228deb Mon Sep 17 00:00:00 2001 From: Jiazhen Fan <52474868+sfc-gh-jfan@users.noreply.github.com> Date: Wed, 18 Oct 2023 15:34:46 -0700 Subject: [PATCH 03/10] PRODSEC-3611 fix GHA parsing (#603) --- .github/workflows/snyk-issue.yml | 5 +++++ .github/workflows/snyk-pr.yml | 6 ++++++ 2 files changed, 11 insertions(+) diff --git a/.github/workflows/snyk-issue.yml b/.github/workflows/snyk-issue.yml index b586554dd..2a3f6226a 100644 --- a/.github/workflows/snyk-issue.yml +++ b/.github/workflows/snyk-issue.yml @@ -4,6 +4,11 @@ on: schedule: - cron: '* */12 * * *' +permissions: + contents: read + issues: write + pull-requests: write + concurrency: snyk-issue jobs: diff --git a/.github/workflows/snyk-pr.yml b/.github/workflows/snyk-pr.yml index 1ef32b622..4cb65c098 100644 --- a/.github/workflows/snyk-pr.yml +++ b/.github/workflows/snyk-pr.yml @@ -3,6 +3,12 @@ on: pull_request: branches: - master + +permissions: + contents: read + issues: write + pull-requests: write + jobs: snyk: runs-on: ubuntu-latest From c690d568853205f7398faa8df8b3577c429dcf7a Mon Sep 17 00:00:00 2001 From: Lukas Sembera Date: Thu, 19 Oct 2023 10:44:38 +0200 Subject: [PATCH 04/10] no-snow Sort pom.xml in format.sh (#604) --- format.sh | 3 +++ 1 file changed, 3 insertions(+) diff --git a/format.sh b/format.sh index 0cade08f5..cb58bd995 100755 --- a/format.sh +++ b/format.sh @@ -19,3 +19,6 @@ if ! command -v java > /dev/null; then fi echo "Running Google Java Format" find ./src -type f -name "*.java" -print0 | xargs -0 java -jar "${JAR_FILE}" --replace --set-exit-if-changed && echo "OK" + +echo "Sorting pom.xml" +mvn com.github.ekryd.sortpom:sortpom-maven-plugin:sort From 7dd67842d84732aad02cb05f75de51cc5bbb5a79 Mon Sep 17 00:00:00 2001 From: Lukas Sembera Date: Thu, 19 Oct 2023 11:29:01 +0200 Subject: [PATCH 05/10] SNOW-936900 Use upgraded version of snappy-java (#605) The SDK is exporting an old version of its transitive dependency snappy-java . This is visible in the new e2e jar project, which is pulling in the snappy-java defined in hadoop-common and not the version we define in dependencyManagement. This PR makes snappy-java a direct dependency, so we can control the version. --- pom.xml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pom.xml b/pom.xml index fc4a2ab81..8a11ccb45 100644 --- a/pom.xml +++ b/pom.xml @@ -473,6 +473,12 @@ org.slf4j slf4j-api + + org.xerial.snappy + snappy-java + runtime + + junit From 38d30c66a3bf5e366687360ff694e1713b22ee0f Mon Sep 17 00:00:00 2001 From: Lukas Sembera Date: Thu, 19 Oct 2023 21:23:40 +0200 Subject: [PATCH 06/10] SNOW-945927 SNOW-945928 Fix Snyk vulnerabilities (#641) --- e2e-jar-test/pom.xml | 6 ++++++ pom.xml | 5 +++++ 2 files changed, 11 insertions(+) diff --git a/e2e-jar-test/pom.xml b/e2e-jar-test/pom.xml index 9368308fc..1050d0967 100644 --- a/e2e-jar-test/pom.xml +++ b/e2e-jar-test/pom.xml @@ -36,6 +36,12 @@ 3.13.30 + + org.bouncycastle + bc-fips + 1.0.2.4 + + org.slf4j slf4j-simple diff --git a/pom.xml b/pom.xml index 8a11ccb45..9789e9e09 100644 --- a/pom.xml +++ b/pom.xml @@ -473,6 +473,11 @@ org.slf4j slf4j-api + + com.google.protobuf + protobuf-java + runtime + org.xerial.snappy snappy-java From 9cf82ba17fdf67ac1c2dfc6097a07281718317e5 Mon Sep 17 00:00:00 2001 From: Toby Zhang Date: Thu, 19 Oct 2023 14:55:24 -0700 Subject: [PATCH 07/10] NO-SNOW: fix row index issue for ON_ERROR=SKIP_BATCH (#606) We need to use a separate variable to store the index in the original batch instead of tempRowcount, added a test to verify the behavior --- .../streaming/internal/AbstractRowBuffer.java | 10 ++-- .../streaming/internal/RowBufferTest.java | 55 +++++++++++++++---- 2 files changed, 49 insertions(+), 16 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java index 3029e7a49..766a24763 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java @@ -223,13 +223,14 @@ public InsertValidationResponse insertRows( float rowsSizeInBytes = 0F; float tempRowsSizeInBytes = 0F; int tempRowCount = 0; + int rowIndex = 0; for (Map row : rows) { InsertValidationResponse.InsertError error = - new InsertValidationResponse.InsertError(row, tempRowCount); + new InsertValidationResponse.InsertError(row, rowIndex); try { - Set inputColumnNames = verifyInputColumns(row, error, tempRowCount); + Set inputColumnNames = verifyInputColumns(row, error, rowIndex); tempRowsSizeInBytes += - addTempRow(row, tempRowCount, rowBuffer.tempStatsMap, inputColumnNames, tempRowCount); + addTempRow(row, tempRowCount, rowBuffer.tempStatsMap, inputColumnNames, rowIndex); tempRowCount++; } catch (SFException e) { error.setException(e); @@ -239,8 +240,9 @@ public InsertValidationResponse insertRows( error.setException(new SFException(e, ErrorCode.INTERNAL_ERROR, e.getMessage())); response.addError(error); } + rowIndex++; checkBatchSizeEnforcedMaximum(tempRowsSizeInBytes); - if ((long) rowBuffer.bufferedRowCount + tempRowCount >= Integer.MAX_VALUE) { + if ((long) rowBuffer.bufferedRowCount + rowIndex >= Integer.MAX_VALUE) { throw new SFException(ErrorCode.INTERNAL_ERROR, "Row count reaches MAX value"); } } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java index 269b6f28d..9ee1d29cb 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java @@ -14,7 +14,6 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Objects; import net.snowflake.ingest.streaming.InsertValidationResponse; import net.snowflake.ingest.streaming.OpenChannelRequest; import net.snowflake.ingest.utils.Constants; @@ -267,40 +266,54 @@ public void testInvalidPhysicalType() { public void testStringLength() { testStringLengthHelper(this.rowBufferOnErrorContinue); testStringLengthHelper(this.rowBufferOnErrorAbort); + testStringLengthHelper(this.rowBufferOnErrorSkipBatch); } @Test public void testRowIndexWithMultipleRowsWithError() { + testRowIndexWithMultipleRowsWithErrorHelper(this.rowBufferOnErrorContinue); + testRowIndexWithMultipleRowsWithErrorHelper(this.rowBufferOnErrorSkipBatch); + } + + public void testRowIndexWithMultipleRowsWithErrorHelper(AbstractRowBuffer rowBuffer) { List> rows = new ArrayList<>(); Map row = new HashMap<>(); - // row with good data + // row with interleaved good and bad data row.put("colInt", 3); rows.add(row); row = new HashMap<>(); row.put("colChar", "1111111111111111111111"); // too big + rows.add(row); + + row = new HashMap<>(); + row.put("colInt", 3); + rows.add(row); - // lets add a row with bad data + row = new HashMap<>(); + row.put("colChar", "1111111111111111111111"); // too big rows.add(row); - InsertValidationResponse response = this.rowBufferOnErrorContinue.insertRows(rows, null); + InsertValidationResponse response = rowBuffer.insertRows(rows, null); Assert.assertTrue(response.hasErrors()); - Assert.assertEquals(1, response.getErrorRowCount()); + Assert.assertEquals(2, response.getErrorRowCount()); // second row out of the rows we sent was having bad data. // so InsertError corresponds to second row. Assert.assertEquals(1, response.getInsertErrors().get(0).getRowIndex()); + Assert.assertEquals(3, response.getInsertErrors().get(1).getRowIndex()); - Assert.assertTrue(response.getInsertErrors().get(0).getException() != null); + Assert.assertNotNull(response.getInsertErrors().get(0).getException()); + Assert.assertNotNull(response.getInsertErrors().get(1).getException()); - Assert.assertTrue( - Objects.equals( - response.getInsertErrors().get(0).getException().getVendorCode(), - ErrorCode.INVALID_VALUE_ROW.getMessageCode())); - - Assert.assertEquals(1, response.getInsertErrors().get(0).getRowIndex()); + Assert.assertEquals( + response.getInsertErrors().get(0).getException().getVendorCode(), + ErrorCode.INVALID_VALUE_ROW.getMessageCode()); + Assert.assertEquals( + response.getInsertErrors().get(1).getException().getVendorCode(), + ErrorCode.INVALID_VALUE_ROW.getMessageCode()); Assert.assertTrue( response @@ -313,6 +326,17 @@ public void testRowIndexWithMultipleRowsWithError() { + " Value cannot be ingested into Snowflake column COLCHAR of type STRING," + " rowIndex:1, reason: String too long: length=22 characters maxLength=11" + " characters")); + Assert.assertTrue( + response + .getInsertErrors() + .get(1) + .getException() + .getMessage() + .equalsIgnoreCase( + "The given row cannot be converted to the internal format due to invalid value:" + + " Value cannot be ingested into Snowflake column COLCHAR of type STRING," + + " rowIndex:3, reason: String too long: length=22 characters maxLength=11" + + " characters")); } private void testStringLengthHelper(AbstractRowBuffer rowBuffer) { @@ -357,6 +381,7 @@ private void testStringLengthHelper(AbstractRowBuffer rowBuffer) { public void testInsertRow() { testInsertRowHelper(this.rowBufferOnErrorContinue); testInsertRowHelper(this.rowBufferOnErrorAbort); + testInsertRowHelper(this.rowBufferOnErrorSkipBatch); } private void testInsertRowHelper(AbstractRowBuffer rowBuffer) { @@ -377,6 +402,7 @@ private void testInsertRowHelper(AbstractRowBuffer rowBuffer) { public void testNullInsertRow() { testInsertNullRowHelper(this.rowBufferOnErrorContinue); testInsertNullRowHelper(this.rowBufferOnErrorAbort); + testInsertNullRowHelper(this.rowBufferOnErrorSkipBatch); } private void testInsertNullRowHelper(AbstractRowBuffer rowBuffer) { @@ -397,6 +423,7 @@ private void testInsertNullRowHelper(AbstractRowBuffer rowBuffer) { public void testInsertRows() { testInsertRowsHelper(this.rowBufferOnErrorContinue); testInsertRowsHelper(this.rowBufferOnErrorAbort); + testInsertRowsHelper(this.rowBufferOnErrorSkipBatch); } private void testInsertRowsHelper(AbstractRowBuffer rowBuffer) { @@ -426,6 +453,7 @@ private void testInsertRowsHelper(AbstractRowBuffer rowBuffer) { public void testFlush() { testFlushHelper(this.rowBufferOnErrorAbort); testFlushHelper(this.rowBufferOnErrorContinue); + testFlushHelper(this.rowBufferOnErrorSkipBatch); } private void testFlushHelper(AbstractRowBuffer rowBuffer) { @@ -598,6 +626,7 @@ public void testInvalidEPInfo() { public void testE2E() { testE2EHelper(this.rowBufferOnErrorAbort); testE2EHelper(this.rowBufferOnErrorContinue); + testE2EHelper(this.rowBufferOnErrorSkipBatch); } private void testE2EHelper(AbstractRowBuffer rowBuffer) { @@ -626,6 +655,7 @@ private void testE2EHelper(AbstractRowBuffer rowBuffer) { public void testE2ETimestampErrors() { testE2ETimestampErrorsHelper(this.rowBufferOnErrorAbort); testE2ETimestampErrorsHelper(this.rowBufferOnErrorContinue); + testE2ETimestampErrorsHelper(this.rowBufferOnErrorSkipBatch); } private void testE2ETimestampErrorsHelper(AbstractRowBuffer innerBuffer) { @@ -663,6 +693,7 @@ private void testE2ETimestampErrorsHelper(AbstractRowBuffer innerBuffer) { public void testStatsE2E() { testStatsE2EHelper(this.rowBufferOnErrorAbort); testStatsE2EHelper(this.rowBufferOnErrorContinue); + testStatsE2EHelper(this.rowBufferOnErrorSkipBatch); } private void testStatsE2EHelper(AbstractRowBuffer rowBuffer) { From c5650252c6cf0ec900a4d0655151a539f9672a23 Mon Sep 17 00:00:00 2001 From: Toby Zhang Date: Mon, 23 Oct 2023 22:57:22 -0700 Subject: [PATCH 08/10] V2.0.4 release (#643) v2.0.4 release, all the additional changes are done by sorting the pom file, looks like this is added as part of #604 --- e2e-jar-test/pom.xml | 104 +++++++++--------- pom.xml | 26 ++--- .../ingest/connection/RequestBuilder.java | 3 +- 3 files changed, 67 insertions(+), 66 deletions(-) diff --git a/e2e-jar-test/pom.xml b/e2e-jar-test/pom.xml index 1050d0967..9a140aaeb 100644 --- a/e2e-jar-test/pom.xml +++ b/e2e-jar-test/pom.xml @@ -1,65 +1,65 @@ - 4.0.0 + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + 4.0.0 - net.snowflake.snowflake-ingest-java-e2e-jar-test - parent - 1.0-SNAPSHOT - pom + net.snowflake.snowflake-ingest-java-e2e-jar-test + parent + 1.0-SNAPSHOT + pom - snowflake-ingest-sdk-e2e-test + snowflake-ingest-sdk-e2e-test - - standard - fips - core - + + standard + fips + core + - + - - - net.snowflake.snowflake-ingest-java-e2e-jar-test - core - ${project.version} - + + + net.snowflake.snowflake-ingest-java-e2e-jar-test + core + ${project.version} + - - net.snowflake - snowflake-ingest-sdk - 2.0.4-SNAPSHOT - + + net.snowflake + snowflake-ingest-sdk + 2.0.4 + - - net.snowflake - snowflake-jdbc-fips - 3.13.30 - + + net.snowflake + snowflake-jdbc-fips + 3.13.30 + - - org.bouncycastle - bc-fips - 1.0.2.4 - + + org.bouncycastle + bc-fips + 1.0.2.4 + - - org.slf4j - slf4j-simple - 1.7.36 - + + org.slf4j + slf4j-simple + 1.7.36 + - - com.fasterxml.jackson.core - jackson-databind - 2.14.0 - + + com.fasterxml.jackson.core + jackson-databind + 2.14.0 + - - junit - junit - 4.13.2 - test - - - + + junit + junit + 4.13.2 + test + + + diff --git a/pom.xml b/pom.xml index 9789e9e09..90f5ccbb5 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ net.snowflake snowflake-ingest-sdk - 2.0.4-SNAPSHOT + 2.0.4 jar Snowflake Ingest SDK Snowflake Ingest SDK @@ -779,9 +779,9 @@ failFast + The list of allowed licenses. If you see the build failing due to "There are some forbidden licenses used, please + check your dependencies", verify the conditions of the license and add the reference to it here. + --> Apache License 2.0 BSD 2-Clause License @@ -899,9 +899,9 @@ + Copy all project dependencies to target/dependency-jars. License processing Python script will look here for + license files of SDK dependencies. + --> org.apache.maven.plugins maven-dependency-plugin @@ -923,9 +923,9 @@ + Compile the list of SDK dependencies in 'compile' and 'runtime' scopes. + This list is an entry point for the license processing python script. + --> org.apache.maven.plugins maven-dependency-plugin @@ -1108,9 +1108,9 @@ + Plugin executes license processing Python script, which copies third party license files into the directory + target/generated-licenses-info/META-INF/third-party-licenses, which is then included in the shaded JAR. + --> org.codehaus.mojo exec-maven-plugin diff --git a/src/main/java/net/snowflake/ingest/connection/RequestBuilder.java b/src/main/java/net/snowflake/ingest/connection/RequestBuilder.java index 6936bc52a..77443e7b8 100644 --- a/src/main/java/net/snowflake/ingest/connection/RequestBuilder.java +++ b/src/main/java/net/snowflake/ingest/connection/RequestBuilder.java @@ -110,7 +110,7 @@ public class RequestBuilder { // Don't change! public static final String CLIENT_NAME = "SnowpipeJavaSDK"; - public static final String DEFAULT_VERSION = "2.0.4-SNAPSHOT"; + public static final String DEFAULT_VERSION = "2.0.4"; public static final String JAVA_USER_AGENT = "JAVA"; @@ -351,6 +351,7 @@ private static String getDefaultUserAgent() { private static String buildCustomUserAgent(String additionalUserAgentInfo) { return USER_AGENT.trim() + " " + additionalUserAgentInfo; } + /** A simple POJO for generating our POST body to the insert endpoint */ private static class IngestRequest { // the list of files we're loading From 4450fc45fc99a5fb830598fa7f7eee705fa00cf5 Mon Sep 17 00:00:00 2001 From: Alkin Sen <120425561+sfc-gh-asen@users.noreply.github.com> Date: Wed, 25 Oct 2023 16:01:36 -0700 Subject: [PATCH 09/10] SNOW-949967 Add an optional offset token parameter for openChannel (#645) --- .../ingest/streaming/OpenChannelRequest.java | 22 ++++++++++++ ...nowflakeStreamingIngestClientInternal.java | 3 ++ .../SnowflakeStreamingIngestChannelTest.java | 19 ++++++++++ .../streaming/internal/StreamingIngestIT.java | 35 +++++++++++++++++++ 4 files changed, 79 insertions(+) diff --git a/src/main/java/net/snowflake/ingest/streaming/OpenChannelRequest.java b/src/main/java/net/snowflake/ingest/streaming/OpenChannelRequest.java index f48288330..d30472b63 100644 --- a/src/main/java/net/snowflake/ingest/streaming/OpenChannelRequest.java +++ b/src/main/java/net/snowflake/ingest/streaming/OpenChannelRequest.java @@ -40,6 +40,9 @@ public enum OnErrorOption { // Default timezone for TIMESTAMP_LTZ and TIMESTAMP_TZ columns private final ZoneId defaultTimezone; + private final String offsetToken; + private final boolean isOffsetTokenProvided; + public static OpenChannelRequestBuilder builder(String channelName) { return new OpenChannelRequestBuilder(channelName); } @@ -53,6 +56,9 @@ public static class OpenChannelRequestBuilder { private OnErrorOption onErrorOption; private ZoneId defaultTimezone; + private String offsetToken; + private boolean isOffsetTokenProvided = false; + public OpenChannelRequestBuilder(String channelName) { this.channelName = channelName; this.defaultTimezone = DEFAULT_DEFAULT_TIMEZONE; @@ -83,6 +89,12 @@ public OpenChannelRequestBuilder setDefaultTimezone(ZoneId defaultTimezone) { return this; } + public OpenChannelRequestBuilder setOffsetToken(String offsetToken){ + this.offsetToken = offsetToken; + this.isOffsetTokenProvided = true; + return this; + } + public OpenChannelRequest build() { return new OpenChannelRequest(this); } @@ -102,6 +114,8 @@ private OpenChannelRequest(OpenChannelRequestBuilder builder) { this.tableName = builder.tableName; this.onErrorOption = builder.onErrorOption; this.defaultTimezone = builder.defaultTimezone; + this.offsetToken = builder.offsetToken; + this.isOffsetTokenProvided = builder.isOffsetTokenProvided; } public String getDBName() { @@ -131,4 +145,12 @@ public String getFullyQualifiedTableName() { public OnErrorOption getOnErrorOption() { return this.onErrorOption; } + + public String getOffsetToken() { + return this.offsetToken; + } + + public boolean isOffsetTokenProvided() { + return this.isOffsetTokenProvided; + } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index 27ff9407e..a52149331 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -304,6 +304,9 @@ public SnowflakeStreamingIngestChannelInternal openChannel(OpenChannelRequest payload.put("schema", request.getSchemaName()); payload.put("write_mode", Constants.WriteMode.CLOUD_STORAGE.name()); payload.put("role", this.role); + if (request.isOffsetTokenProvided()){ + payload.put("offset_token", request.getOffsetToken()); + } OpenChannelResponse response = executeWithRetries( diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java index a1e26b63d..037028086 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java @@ -263,6 +263,25 @@ public void testOpenChannelRequestCreationSuccess() { Assert.assertEquals( "STREAMINGINGEST_TEST.PUBLIC.T_STREAMINGINGEST", request.getFullyQualifiedTableName()); + Assert.assertFalse(request.isOffsetTokenProvided()); + } + + + @Test + public void testOpenChannelRequesCreationtWithOffsetToken() { + OpenChannelRequest request = + OpenChannelRequest.builder("CHANNEL") + .setDBName("STREAMINGINGEST_TEST") + .setSchemaName("PUBLIC") + .setTableName("T_STREAMINGINGEST") + .setOnErrorOption(OpenChannelRequest.OnErrorOption.CONTINUE) + .setOffsetToken("TEST_TOKEN") + .build(); + + Assert.assertEquals( + "STREAMINGINGEST_TEST.PUBLIC.T_STREAMINGINGEST", request.getFullyQualifiedTableName()); + Assert.assertEquals("TEST_TOKEN", request.getOffsetToken()); + Assert.assertTrue(request.isOffsetTokenProvided()); } @Test diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestIT.java index 95cc5c699..eadbe81ee 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/StreamingIngestIT.java @@ -532,6 +532,41 @@ public void testMultiColumnIngest() throws Exception { Assert.fail("Row sequencer not updated before timeout"); } + @Test + public void testOpenChannelOffsetToken() throws Exception { + String tableName = "offsetTokenTest"; + jdbcConnection + .createStatement() + .execute( + String.format( + "create or replace table %s (s text);", + tableName)); + OpenChannelRequest request1 = + OpenChannelRequest.builder("TEST_CHANNEL") + .setDBName(testDb) + .setSchemaName(TEST_SCHEMA) + .setTableName(tableName) + .setOnErrorOption(OpenChannelRequest.OnErrorOption.CONTINUE) + .setOffsetToken("TEST_OFFSET") + .build(); + + // Open a streaming ingest channel from the given client + SnowflakeStreamingIngestChannel channel1 = client.openChannel(request1); + + // Close the channel after insertion + channel1.close().get(); + + for (int i = 1; i < 15; i++) { + if (channel1.getLatestCommittedOffsetToken() != null + && channel1.getLatestCommittedOffsetToken().equals("TEST_OFFSET")) { + return; + } else { + Thread.sleep(2000); + } + } + Assert.fail("Row sequencer not updated before timeout"); + } + @Test public void testNullableColumns() throws Exception { String multiTableName = "multi_column"; From d0fd1d8e084ee3f98f8fdbdbff126c3843ca5dbb Mon Sep 17 00:00:00 2001 From: Gloria Doci Date: Tue, 31 Oct 2023 12:54:51 +0100 Subject: [PATCH 10/10] SNOW-672156 support specifying compression algorithm to be used for BDEC Parquet files. GZIP is the only allowed value for now. (#579) --- .../internal/ClientBufferParameters.java | 23 ++++++++-- .../streaming/internal/ParquetFlusher.java | 16 ++++++- .../streaming/internal/ParquetRowBuffer.java | 6 ++- .../net/snowflake/ingest/utils/Constants.java | 24 +++++++++++ .../ingest/utils/ParameterProvider.java | 24 ++++++++++- .../parquet/hadoop/BdecParquetWriter.java | 7 ++-- .../internal/ParameterProviderTest.java | 42 +++++++++++++++++++ .../streaming/internal/RowBufferTest.java | 3 +- 8 files changed, 133 insertions(+), 12 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java b/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java index dffd824c8..278d4abea 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ClientBufferParameters.java @@ -4,6 +4,7 @@ package net.snowflake.ingest.streaming.internal; +import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ParameterProvider; /** Channel's buffer relevant parameters that are set at the owning client level. */ @@ -15,6 +16,8 @@ public class ClientBufferParameters { private long maxAllowedRowSizeInBytes; + private Constants.BdecParquetCompression bdecParquetCompression; + /** * Private constructor used for test methods * @@ -26,10 +29,12 @@ public class ClientBufferParameters { private ClientBufferParameters( boolean enableParquetInternalBuffering, long maxChunkSizeInBytes, - long maxAllowedRowSizeInBytes) { + long maxAllowedRowSizeInBytes, + Constants.BdecParquetCompression bdecParquetCompression) { this.enableParquetInternalBuffering = enableParquetInternalBuffering; this.maxChunkSizeInBytes = maxChunkSizeInBytes; this.maxAllowedRowSizeInBytes = maxAllowedRowSizeInBytes; + this.bdecParquetCompression = bdecParquetCompression; } /** @param clientInternal reference to the client object where the relevant parameters are set */ @@ -46,6 +51,10 @@ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInter clientInternal != null ? clientInternal.getParameterProvider().getMaxAllowedRowSizeInBytes() : ParameterProvider.MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT; + this.bdecParquetCompression = + clientInternal != null + ? clientInternal.getParameterProvider().getBdecParquetCompressionAlgorithm() + : ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT; } /** @@ -58,9 +67,13 @@ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInter public static ClientBufferParameters test_createClientBufferParameters( boolean enableParquetInternalBuffering, long maxChunkSizeInBytes, - long maxAllowedRowSizeInBytes) { + long maxAllowedRowSizeInBytes, + Constants.BdecParquetCompression bdecParquetCompression) { return new ClientBufferParameters( - enableParquetInternalBuffering, maxChunkSizeInBytes, maxAllowedRowSizeInBytes); + enableParquetInternalBuffering, + maxChunkSizeInBytes, + maxAllowedRowSizeInBytes, + bdecParquetCompression); } public boolean getEnableParquetInternalBuffering() { @@ -74,4 +87,8 @@ public long getMaxChunkSizeInBytes() { public long getMaxAllowedRowSizeInBytes() { return maxAllowedRowSizeInBytes; } + + public Constants.BdecParquetCompression getBdecParquetCompression() { + return bdecParquetCompression; + } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java index 87c30207d..e81dbc0eb 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetFlusher.java @@ -9,6 +9,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.Logging; import net.snowflake.ingest.utils.Pair; @@ -27,15 +28,21 @@ public class ParquetFlusher implements Flusher { private final boolean enableParquetInternalBuffering; private final long maxChunkSizeInBytes; + private final Constants.BdecParquetCompression bdecParquetCompression; + /** * Construct parquet flusher from its schema and set flag that indicates whether Parquet memory * optimization is enabled, i.e. rows will be buffered in internal Parquet buffer. */ public ParquetFlusher( - MessageType schema, boolean enableParquetInternalBuffering, long maxChunkSizeInBytes) { + MessageType schema, + boolean enableParquetInternalBuffering, + long maxChunkSizeInBytes, + Constants.BdecParquetCompression bdecParquetCompression) { this.schema = schema; this.enableParquetInternalBuffering = enableParquetInternalBuffering; this.maxChunkSizeInBytes = maxChunkSizeInBytes; + this.bdecParquetCompression = bdecParquetCompression; } @Override @@ -198,7 +205,12 @@ private SerializationResult serializeFromJavaObjects( Map metadata = channelsDataPerTable.get(0).getVectors().metadata; parquetWriter = new BdecParquetWriter( - mergedData, schema, metadata, firstChannelFullyQualifiedTableName, maxChunkSizeInBytes); + mergedData, + schema, + metadata, + firstChannelFullyQualifiedTableName, + maxChunkSizeInBytes, + bdecParquetCompression); rows.forEach(parquetWriter::writeRow); parquetWriter.close(); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java index 6fb7b8027..602e34400 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java @@ -122,7 +122,8 @@ private void createFileWriter() { schema, metadata, channelName, - clientBufferParameters.getMaxChunkSizeInBytes()); + clientBufferParameters.getMaxChunkSizeInBytes(), + clientBufferParameters.getBdecParquetCompression()); } else { this.bdecParquetWriter = null; } @@ -323,7 +324,8 @@ public Flusher createFlusher() { return new ParquetFlusher( schema, clientBufferParameters.getEnableParquetInternalBuffering(), - clientBufferParameters.getMaxChunkSizeInBytes()); + clientBufferParameters.getMaxChunkSizeInBytes(), + clientBufferParameters.getBdecParquetCompression()); } private static class ParquetColumn { diff --git a/src/main/java/net/snowflake/ingest/utils/Constants.java b/src/main/java/net/snowflake/ingest/utils/Constants.java index 09814235b..a22bc9f21 100644 --- a/src/main/java/net/snowflake/ingest/utils/Constants.java +++ b/src/main/java/net/snowflake/ingest/utils/Constants.java @@ -5,6 +5,7 @@ package net.snowflake.ingest.utils; import java.util.Arrays; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; /** Contains all the constants needed for Streaming Ingest */ public class Constants { @@ -113,6 +114,29 @@ public static BdecVersion fromInt(int val) { } } + /** + * Compression algorithm supported by BDEC Parquet Writer. It is a wrapper around Parquet's lib + * CompressionCodecName, but we want to control and allow only specific values of that. + */ + public enum BdecParquetCompression { + GZIP; + + public CompressionCodecName getCompressionCodec() { + return CompressionCodecName.fromConf(this.name()); + } + + public static BdecParquetCompression fromName(String name) { + for (BdecParquetCompression e : BdecParquetCompression.values()) { + if (e.name().toLowerCase().equals(name.toLowerCase())) { + return e; + } + } + throw new IllegalArgumentException( + String.format( + "Unsupported BDEC_PARQUET_COMPRESSION_ALGORITHM = '%s', allowed values are %s", + name, Arrays.asList(BdecParquetCompression.values()))); + } + } // Parameters public static final boolean DISABLE_BACKGROUND_FLUSH = false; public static final boolean COMPRESS_BLOB_TWICE = false; diff --git a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java index 3a2221697..82dd1164b 100644 --- a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java +++ b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java @@ -38,6 +38,9 @@ public class ParameterProvider { public static final String MAX_CLIENT_LAG_ENABLED = "MAX_CLIENT_LAG_ENABLED".toLowerCase(); + public static final String BDEC_PARQUET_COMPRESSION_ALGORITHM = + "BDEC_PARQUET_COMPRESSION_ALGORITHM".toLowerCase(); + // Default values public static final long BUFFER_FLUSH_INTERVAL_IN_MILLIS_DEFAULT = 1000; public static final long BUFFER_FLUSH_CHECK_INTERVAL_IN_MILLIS_DEFAULT = 100; @@ -63,6 +66,9 @@ public class ParameterProvider { public static final long MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT = 64 * 1024 * 1024; // 64 MB public static final int MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT = 100; + public static final Constants.BdecParquetCompression BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT = + Constants.BdecParquetCompression.GZIP; + /* Parameter that enables using internal Parquet buffers for buffering of rows before serializing. It reduces memory consumption compared to using Java Objects for buffering.*/ public static final boolean ENABLE_PARQUET_INTERNAL_BUFFERING_DEFAULT = false; @@ -178,6 +184,12 @@ private void setParameterMap(Map parameterOverrides, Properties MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT, parameterOverrides, props); + + this.updateValue( + BDEC_PARQUET_COMPRESSION_ALGORITHM, + BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT, + parameterOverrides, + props); } /** @return Longest interval in milliseconds between buffer flushes */ @@ -377,7 +389,6 @@ public long getMaxChunkSizeInBytes() { return (val instanceof String) ? Long.parseLong(val.toString()) : (long) val; } - /** @return The max allow row size (in bytes) */ public long getMaxAllowedRowSizeInBytes() { Object val = this.parameterMap.getOrDefault( @@ -397,6 +408,17 @@ public int getMaxChunksInBlobAndRegistrationRequest() { return (val instanceof String) ? Integer.parseInt(val.toString()) : (int) val; } + /** @return BDEC compression algorithm */ + public Constants.BdecParquetCompression getBdecParquetCompressionAlgorithm() { + Object val = + this.parameterMap.getOrDefault( + BDEC_PARQUET_COMPRESSION_ALGORITHM, BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT); + if (val instanceof Constants.BdecParquetCompression) { + return (Constants.BdecParquetCompression) val; + } + return Constants.BdecParquetCompression.fromName((String) val); + } + @Override public String toString() { return "ParameterProvider{" + "parameterMap=" + parameterMap + '}'; diff --git a/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java b/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java index b2442d3dc..8b71cfd0e 100644 --- a/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java +++ b/src/main/java/org/apache/parquet/hadoop/BdecParquetWriter.java @@ -17,7 +17,6 @@ import org.apache.parquet.column.values.factory.DefaultV1ValuesWriterFactory; import org.apache.parquet.crypto.FileEncryptionProperties; import org.apache.parquet.hadoop.api.WriteSupport; -import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.io.DelegatingPositionOutputStream; import org.apache.parquet.io.OutputFile; import org.apache.parquet.io.ParquetEncodingException; @@ -51,7 +50,8 @@ public BdecParquetWriter( MessageType schema, Map extraMetaData, String channelName, - long maxChunkSizeInBytes) + long maxChunkSizeInBytes, + Constants.BdecParquetCompression bdecParquetCompression) throws IOException { OutputFile file = new ByteArrayOutputFile(stream, maxChunkSizeInBytes); ParquetProperties encodingProps = createParquetProperties(); @@ -86,7 +86,8 @@ public BdecParquetWriter( */ codecFactory = new CodecFactory(conf, ParquetWriter.DEFAULT_PAGE_SIZE); @SuppressWarnings("deprecation") // Parquet does not support the new one now - CodecFactory.BytesCompressor compressor = codecFactory.getCompressor(CompressionCodecName.GZIP); + CodecFactory.BytesCompressor compressor = + codecFactory.getCompressor(bdecParquetCompression.getCompressionCodec()); writer = new InternalParquetRecordWriter<>( fileWriter, diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java index 7838763de..8499fc985 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java @@ -1,8 +1,11 @@ package net.snowflake.ingest.streaming.internal; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Properties; +import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ParameterProvider; import org.junit.Assert; import org.junit.Test; @@ -20,6 +23,7 @@ private Map getStartingParameterMap() { parameterMap.put(ParameterProvider.BLOB_UPLOAD_MAX_RETRY_COUNT, 100); parameterMap.put(ParameterProvider.MAX_MEMORY_LIMIT_IN_BYTES, 1000L); parameterMap.put(ParameterProvider.MAX_CHANNEL_SIZE_IN_BYTES, 1000000L); + parameterMap.put(ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM, "gzip"); return parameterMap; } @@ -39,6 +43,9 @@ public void withValuesSet() { Assert.assertEquals(100, parameterProvider.getBlobUploadMaxRetryCount()); Assert.assertEquals(1000L, parameterProvider.getMaxMemoryLimitInBytes()); Assert.assertEquals(1000000L, parameterProvider.getMaxChannelSizeInBytes()); + Assert.assertEquals( + Constants.BdecParquetCompression.GZIP, + parameterProvider.getBdecParquetCompressionAlgorithm()); } @Test @@ -130,6 +137,9 @@ public void withDefaultValues() { Assert.assertEquals( ParameterProvider.MAX_CHANNEL_SIZE_IN_BYTES_DEFAULT, parameterProvider.getMaxChannelSizeInBytes()); + Assert.assertEquals( + ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM_DEFAULT, + parameterProvider.getBdecParquetCompressionAlgorithm()); } @Test @@ -281,4 +291,36 @@ public void testMaxChunksInBlobAndRegistrationRequest() { ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); Assert.assertEquals(1, parameterProvider.getMaxChunksInBlobAndRegistrationRequest()); } + + @Test + public void testValidCompressionAlgorithmsAndWithUppercaseLowerCase() { + List gzipValues = Arrays.asList("GZIP", "gzip", "Gzip", "gZip"); + gzipValues.forEach( + v -> { + Properties prop = new Properties(); + Map parameterMap = getStartingParameterMap(); + parameterMap.put(ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM, v); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + Assert.assertEquals( + Constants.BdecParquetCompression.GZIP, + parameterProvider.getBdecParquetCompressionAlgorithm()); + }); + } + + @Test + public void testInvalidCompressionAlgorithm() { + Properties prop = new Properties(); + Map parameterMap = getStartingParameterMap(); + parameterMap.put(ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM, "invalid_comp"); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + try { + parameterProvider.getBdecParquetCompressionAlgorithm(); + Assert.fail("Should not have succeeded"); + } catch (IllegalArgumentException e) { + Assert.assertEquals( + "Unsupported BDEC_PARQUET_COMPRESSION_ALGORITHM = 'invalid_comp', allowed values are" + + " [GZIP]", + e.getMessage()); + } + } } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java index 9ee1d29cb..52d8eed2e 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java @@ -120,7 +120,8 @@ private AbstractRowBuffer createTestBuffer(OpenChannelRequest.OnErrorOption o ClientBufferParameters.test_createClientBufferParameters( enableParquetMemoryOptimization, MAX_CHUNK_SIZE_IN_BYTES_DEFAULT, - MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT)); + MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT, + Constants.BdecParquetCompression.GZIP)); } @Test