From df04e8c3884c39d3d8041a5d362288a5bcf3ee7a Mon Sep 17 00:00:00 2001 From: Lukas Sembera Date: Tue, 29 Aug 2023 14:46:23 +0000 Subject: [PATCH] SNOW-902709 Limit the max allowed number of chunks in blob --- .../streaming/internal/FlushService.java | 10 ++ ...nowflakeStreamingIngestClientInternal.java | 37 +++- .../ingest/utils/ParameterProvider.java | 23 +++ .../streaming/internal/FlushServiceTest.java | 31 ++++ .../SnowflakeStreamingIngestClientTest.java | 167 +++++++++++++----- 5 files changed, 221 insertions(+), 47 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java index 8016f9b5c..93ef9a2b7 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java @@ -351,6 +351,16 @@ void distributeFlushTasks() { if (!leftoverChannelsDataPerTable.isEmpty()) { channelsDataPerTable.addAll(leftoverChannelsDataPerTable); leftoverChannelsDataPerTable.clear(); + } else if (blobData.size() + >= this.owningClient.getParameterProvider().getMaxChunksInBlob()) { + // Create a new blob if the current one already contains max allowed number of chunks + logger.logInfo( + "Max allowed number of chunks in the current blob reached. chunkCount={}" + + " maxChunkCount={} currentBlobPath={}", + blobData.size(), + this.owningClient.getParameterProvider().getMaxChunksInBlob(), + blobPath); + break; } else { ConcurrentHashMap> table = itr.next().getValue(); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index 5916e472d..2b72fb51c 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -452,7 +452,42 @@ ChannelsStatusResponse getChannelsStatus( * @param blobs list of uploaded blobs */ void registerBlobs(List blobs) { - this.registerBlobs(blobs, 0); + for (List blobBatch : partitionBlobListForRegistrationRequest(blobs)) { + this.registerBlobs(blobBatch, 0); + } + } + + /** + * Partition the collection of blobs into sub-lists, so that the total number of chunks in each + * sublist does not exceed the max allowed number of chunks in one registration request. + */ + List> partitionBlobListForRegistrationRequest(List blobs) { + List> result = new ArrayList<>(); + List currentBatch = new ArrayList<>(); + int chunksInCurrentBatch = 0; + + for (BlobMetadata blob : blobs) { + if (chunksInCurrentBatch + blob.getChunks().size() + > parameterProvider.getMaxChunksInRegistrationRequest()) { + // Newly added BDEC file would exceed the max number of chunks in a single registration + // request. We put chunks collected so far into the result list and create a new batch with + // the current blob + result.add(currentBatch); + currentBatch = new ArrayList<>(); + currentBatch.add(blob); + chunksInCurrentBatch = blob.getChunks().size(); + } else { + // Newly added BDEC can be added to the current batch because it does not exceed the max + // number of chunks in a single registration request, yet. + currentBatch.add(blob); + chunksInCurrentBatch += blob.getChunks().size(); + } + } + + if (!currentBatch.isEmpty()) { + result.add(currentBatch); + } + return result; } /** diff --git a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java index 7e4c3d042..1f378b1c6 100644 --- a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java +++ b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java @@ -30,6 +30,9 @@ public class ParameterProvider { public static final String MAX_CHUNK_SIZE_IN_BYTES = "MAX_CHUNK_SIZE_IN_BYTES".toLowerCase(); public static final String MAX_ALLOWED_ROW_SIZE_IN_BYTES = "MAX_ALLOWED_ROW_SIZE_IN_BYTES".toLowerCase(); + public static final String MAX_CHUNKS_IN_BLOB = "MAX_CHUNKS_IN_BDEC".toLowerCase(); + public static final String MAX_CHUNKS_IN_REGISTRATION_REQUEST = + "MAX_CHUNKS_IN_REGISTRATION_REQUEST".toLowerCase(); // Default values public static final long BUFFER_FLUSH_INTERVAL_IN_MILLIS_DEFAULT = 1000; @@ -46,6 +49,8 @@ public class ParameterProvider { public static final long MAX_CHANNEL_SIZE_IN_BYTES_DEFAULT = 32000000L; public static final long MAX_CHUNK_SIZE_IN_BYTES_DEFAULT = 128000000L; public static final long MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT = 64 * 1024 * 1024; // 64 MB + public static final int MAX_CHUNKS_IN_BLOB_DEFAULT = 20; + public static final int MAX_CHUNKS_IN_REGISTRATION_REQUEST_DEFAULT = 100; /* Parameter that enables using internal Parquet buffers for buffering of rows before serializing. It reduces memory consumption compared to using Java Objects for buffering.*/ @@ -289,6 +294,7 @@ public long getMaxChunkSizeInBytes() { return (val instanceof String) ? Long.parseLong(val.toString()) : (long) val; } + /** @return The max allow row size (in bytes) */ public long getMaxAllowedRowSizeInBytes() { Object val = this.parameterMap.getOrDefault( @@ -296,6 +302,23 @@ public long getMaxAllowedRowSizeInBytes() { return (val instanceof String) ? Long.parseLong(val.toString()) : (long) val; } + /** @return The max number of chunks that can be put into a single BDEC file */ + public int getMaxChunksInBlob() { + Object val = this.parameterMap.getOrDefault(MAX_CHUNKS_IN_BLOB, MAX_CHUNKS_IN_BLOB_DEFAULT); + return (val instanceof String) ? Integer.parseInt(val.toString()) : (int) val; + } + + /** + * @return The max number of chunks that can be put into a single BDEC registration request. Must + * be higher than MAX_CHUNKS_IN_BDEC. + */ + public int getMaxChunksInRegistrationRequest() { + Object val = + this.parameterMap.getOrDefault( + MAX_CHUNKS_IN_REGISTRATION_REQUEST, MAX_CHUNKS_IN_REGISTRATION_REQUEST_DEFAULT); + return (val instanceof String) ? Integer.parseInt(val.toString()) : (int) val; + } + @Override public String toString() { return "ParameterProvider{" + "parameterMap=" + parameterMap + '}'; diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index 0b8e8b4cd..36bc35626 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -35,6 +35,7 @@ import java.util.List; import java.util.Map; import java.util.TimeZone; +import java.util.UUID; import java.util.concurrent.TimeUnit; import javax.crypto.BadPaddingException; import javax.crypto.IllegalBlockSizeException; @@ -282,6 +283,21 @@ public void setup() { java.security.Security.addProvider(new BouncyCastleProvider()); } + private SnowflakeStreamingIngestChannelInternal addChannel( + TestContext testContext, int tableId) { + return testContext + .channelBuilder("channel" + UUID.randomUUID()) + .setDBName("db1") + .setSchemaName("PUBLIC") + .setTableName("table" + tableId) + .setOffsetToken("offset1") + .setChannelSequencer(0L) + .setRowSequencer(0L) + .setEncryptionKey("key") + .setEncryptionKeyId(1L) + .buildAndAdd(); + } + private SnowflakeStreamingIngestChannelInternal addChannel1(TestContext testContext) { return testContext .channelBuilder("channel1") @@ -553,6 +569,21 @@ public void testBlobSplitDueToChunkSizeLimit() throws Exception { Mockito.verify(flushService, Mockito.times(2)).buildAndUpload(Mockito.any(), Mockito.any()); } + @Test + public void testBlobSplitDueToNumberOfChunks() throws Exception { + TestContext testContext = testContextFactory.create(); + + for (int i = 0; i < 1111; i++) { + SnowflakeStreamingIngestChannelInternal channel = addChannel(testContext, i / 3); + channel.setupSchema(Collections.singletonList(createLargeTestTextColumn("C1"))); + channel.insertRow(Collections.singletonMap("C1", i), ""); + } + + FlushService flushService = testContext.flushService; + flushService.flush(true).get(); + Mockito.verify(flushService, Mockito.times(19)).buildAndUpload(Mockito.any(), Mockito.any()); + } + @Test public void testBuildAndUpload() throws Exception { long expectedBuildLatencyMs = 100; diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java index ec9f671bb..8ea452cbc 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java @@ -11,6 +11,8 @@ import static net.snowflake.ingest.utils.Constants.ROLE; import static net.snowflake.ingest.utils.Constants.USER; import static net.snowflake.ingest.utils.ParameterProvider.ENABLE_SNOWPIPE_STREAMING_METRICS; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.when; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; @@ -322,11 +324,11 @@ public void testGetChannelsStatusWithRequest() throws Exception { CloseableHttpResponse httpResponse = Mockito.mock(CloseableHttpResponse.class); StatusLine statusLine = Mockito.mock(StatusLine.class); HttpEntity httpEntity = Mockito.mock(HttpEntity.class); - Mockito.when(statusLine.getStatusCode()).thenReturn(200); - Mockito.when(httpResponse.getStatusLine()).thenReturn(statusLine); - Mockito.when(httpResponse.getEntity()).thenReturn(httpEntity); - Mockito.when(httpEntity.getContent()).thenReturn(IOUtils.toInputStream(responseString)); - Mockito.when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); + when(statusLine.getStatusCode()).thenReturn(200); + when(httpResponse.getStatusLine()).thenReturn(statusLine); + when(httpResponse.getEntity()).thenReturn(httpEntity); + when(httpEntity.getContent()).thenReturn(IOUtils.toInputStream(responseString)); + when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); RequestBuilder requestBuilder = Mockito.spy( @@ -381,11 +383,11 @@ public void testGetChannelsStatusWithRequestError() throws Exception { CloseableHttpResponse httpResponse = Mockito.mock(CloseableHttpResponse.class); StatusLine statusLine = Mockito.mock(StatusLine.class); HttpEntity httpEntity = Mockito.mock(HttpEntity.class); - Mockito.when(statusLine.getStatusCode()).thenReturn(500); - Mockito.when(httpResponse.getStatusLine()).thenReturn(statusLine); - Mockito.when(httpResponse.getEntity()).thenReturn(httpEntity); - Mockito.when(httpEntity.getContent()).thenReturn(IOUtils.toInputStream(responseString)); - Mockito.when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); + when(statusLine.getStatusCode()).thenReturn(500); + when(httpResponse.getStatusLine()).thenReturn(statusLine); + when(httpResponse.getEntity()).thenReturn(httpEntity); + when(httpEntity.getContent()).thenReturn(IOUtils.toInputStream(responseString)); + when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); RequestBuilder requestBuilder = Mockito.spy( @@ -648,12 +650,12 @@ public void testRegisterBlobErrorResponse() throws Exception { CloseableHttpResponse httpResponse = Mockito.mock(CloseableHttpResponse.class); StatusLine statusLine = Mockito.mock(StatusLine.class); HttpEntity httpEntity = Mockito.mock(HttpEntity.class); - Mockito.when(statusLine.getStatusCode()).thenReturn(500); - Mockito.when(httpResponse.getStatusLine()).thenReturn(statusLine); - Mockito.when(httpResponse.getEntity()).thenReturn(httpEntity); + when(statusLine.getStatusCode()).thenReturn(500); + when(httpResponse.getStatusLine()).thenReturn(statusLine); + when(httpResponse.getEntity()).thenReturn(httpEntity); String response = "testRegisterBlobErrorResponse"; - Mockito.when(httpEntity.getContent()).thenReturn(IOUtils.toInputStream(response)); - Mockito.when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); + when(httpEntity.getContent()).thenReturn(IOUtils.toInputStream(response)); + when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); RequestBuilder requestBuilder = new RequestBuilder(TestUtils.getHost(), TestUtils.getUser(), TestUtils.getKeyPair()); @@ -696,11 +698,11 @@ public void testRegisterBlobSnowflakeInternalErrorResponse() throws Exception { CloseableHttpResponse httpResponse = Mockito.mock(CloseableHttpResponse.class); StatusLine statusLine = Mockito.mock(StatusLine.class); HttpEntity httpEntity = Mockito.mock(HttpEntity.class); - Mockito.when(statusLine.getStatusCode()).thenReturn(200); - Mockito.when(httpResponse.getStatusLine()).thenReturn(statusLine); - Mockito.when(httpResponse.getEntity()).thenReturn(httpEntity); - Mockito.when(httpEntity.getContent()).thenReturn(IOUtils.toInputStream(response)); - Mockito.when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); + when(statusLine.getStatusCode()).thenReturn(200); + when(httpResponse.getStatusLine()).thenReturn(statusLine); + when(httpResponse.getEntity()).thenReturn(httpEntity); + when(httpEntity.getContent()).thenReturn(IOUtils.toInputStream(response)); + when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); RequestBuilder requestBuilder = new RequestBuilder(TestUtils.getHost(), TestUtils.getUser(), TestUtils.getKeyPair()); @@ -752,11 +754,11 @@ public void testRegisterBlobSuccessResponse() throws Exception { CloseableHttpResponse httpResponse = Mockito.mock(CloseableHttpResponse.class); StatusLine statusLine = Mockito.mock(StatusLine.class); HttpEntity httpEntity = Mockito.mock(HttpEntity.class); - Mockito.when(statusLine.getStatusCode()).thenReturn(200); - Mockito.when(httpResponse.getStatusLine()).thenReturn(statusLine); - Mockito.when(httpResponse.getEntity()).thenReturn(httpEntity); - Mockito.when(httpEntity.getContent()).thenReturn(IOUtils.toInputStream(response)); - Mockito.when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); + when(statusLine.getStatusCode()).thenReturn(200); + when(httpResponse.getStatusLine()).thenReturn(statusLine); + when(httpResponse.getEntity()).thenReturn(httpEntity); + when(httpEntity.getContent()).thenReturn(IOUtils.toInputStream(response)); + when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); RequestBuilder requestBuilder = new RequestBuilder(TestUtils.getHost(), TestUtils.getUser(), TestUtils.getKeyPair()); @@ -830,16 +832,16 @@ public void testRegisterBlobsRetries() throws Exception { CloseableHttpResponse httpResponse = Mockito.mock(CloseableHttpResponse.class); StatusLine statusLine = Mockito.mock(StatusLine.class); HttpEntity httpEntity = Mockito.mock(HttpEntity.class); - Mockito.when(statusLine.getStatusCode()).thenReturn(200); - Mockito.when(httpResponse.getStatusLine()).thenReturn(statusLine); - Mockito.when(httpResponse.getEntity()).thenReturn(httpEntity); - Mockito.when(httpEntity.getContent()) + when(statusLine.getStatusCode()).thenReturn(200); + when(httpResponse.getStatusLine()).thenReturn(statusLine); + when(httpResponse.getEntity()).thenReturn(httpEntity); + when(httpEntity.getContent()) .thenReturn( IOUtils.toInputStream(responseString), IOUtils.toInputStream(retryResponseString), IOUtils.toInputStream(retryResponseString), IOUtils.toInputStream(retryResponseString)); - Mockito.when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); + when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); RequestBuilder requestBuilder = Mockito.spy( @@ -865,6 +867,79 @@ public void testRegisterBlobsRetries() throws Exception { Assert.assertFalse(channel2.isValid()); } + @Test + public void testRegisterBlobChunkLimit() throws Exception { + CloseableHttpClient httpClient = Mockito.mock(CloseableHttpClient.class); + RequestBuilder requestBuilder = + Mockito.spy( + new RequestBuilder(TestUtils.getHost(), TestUtils.getUser(), TestUtils.getKeyPair())); + + SnowflakeStreamingIngestClientInternal client = + Mockito.spy( + new SnowflakeStreamingIngestClientInternal<>( + "client", + new SnowflakeURL("snowflake.dev.local:8082"), + null, + httpClient, + true, + requestBuilder, + null)); + + assertEquals(0, client.partitionBlobListForRegistrationRequest(new ArrayList<>()).size()); + assertEquals( + 1, client.partitionBlobListForRegistrationRequest(createTestBlobMetadata(99)).size()); + assertEquals( + 2, client.partitionBlobListForRegistrationRequest(createTestBlobMetadata(101)).size()); + assertEquals( + 2, client.partitionBlobListForRegistrationRequest(createTestBlobMetadata(99, 2)).size()); + assertEquals( + 2, + client + .partitionBlobListForRegistrationRequest(createTestBlobMetadata(55, 44, 2, 98)) + .size()); + assertEquals( + 3, + client + .partitionBlobListForRegistrationRequest(createTestBlobMetadata(55, 44, 2, 99)) + .size()); + assertEquals( + 3, + client + .partitionBlobListForRegistrationRequest(createTestBlobMetadata(55, 44, 2, 99, 1)) + .size()); + } + + /** + * Generate blob metadata with specified number of chunks per blob + * + * @param numbersOfChunks Array of chunk numbers per blob + * @return List of blob metadata + */ + private List createTestBlobMetadata(int... numbersOfChunks) { + List result = new ArrayList<>(); + for (int n : numbersOfChunks) { + List chunkMetadata = new ArrayList<>(); + for (int i = 0; i < n; i++) { + ChunkMetadata chunk = + ChunkMetadata.builder() + .setOwningTableFromChannelContext(channel1.getChannelContext()) + .setChunkStartOffset(0L) + .setChunkLength(1) + .setEncryptionKeyId(0L) + .setChunkMD5("") + .setEpInfo(new EpInfo()) + .setChannelList(new ArrayList<>()) + .setFirstInsertTimeInMs(0L) + .setLastInsertTimeInMs(0L) + .build(); + chunkMetadata.add(chunk); + } + + result.add(new BlobMetadata("", "", chunkMetadata, new BlobStats())); + } + return result; + } + @Test public void testRegisterBlobsRetriesSucceeds() throws Exception { Pair, Set> testData = getRetryBlobMetadata(); @@ -948,13 +1023,13 @@ public void testRegisterBlobsRetriesSucceeds() throws Exception { CloseableHttpResponse httpResponse = Mockito.mock(CloseableHttpResponse.class); StatusLine statusLine = Mockito.mock(StatusLine.class); HttpEntity httpEntity = Mockito.mock(HttpEntity.class); - Mockito.when(statusLine.getStatusCode()).thenReturn(200); - Mockito.when(httpResponse.getStatusLine()).thenReturn(statusLine); - Mockito.when(httpResponse.getEntity()).thenReturn(httpEntity); - Mockito.when(httpEntity.getContent()) + when(statusLine.getStatusCode()).thenReturn(200); + when(httpResponse.getStatusLine()).thenReturn(statusLine); + when(httpResponse.getEntity()).thenReturn(httpEntity); + when(httpEntity.getContent()) .thenReturn( IOUtils.toInputStream(responseString), IOUtils.toInputStream(retryResponseString)); - Mockito.when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); + when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); RequestBuilder requestBuilder = Mockito.spy( @@ -1025,11 +1100,11 @@ public void testRegisterBlobResponseWithInvalidChannel() throws Exception { CloseableHttpResponse httpResponse = Mockito.mock(CloseableHttpResponse.class); StatusLine statusLine = Mockito.mock(StatusLine.class); HttpEntity httpEntity = Mockito.mock(HttpEntity.class); - Mockito.when(statusLine.getStatusCode()).thenReturn(200); - Mockito.when(httpResponse.getStatusLine()).thenReturn(statusLine); - Mockito.when(httpResponse.getEntity()).thenReturn(httpEntity); - Mockito.when(httpEntity.getContent()).thenReturn(IOUtils.toInputStream(response)); - Mockito.when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); + when(statusLine.getStatusCode()).thenReturn(200); + when(httpResponse.getStatusLine()).thenReturn(statusLine); + when(httpResponse.getEntity()).thenReturn(httpEntity); + when(httpEntity.getContent()).thenReturn(IOUtils.toInputStream(response)); + when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); RequestBuilder requestBuilder = new RequestBuilder(TestUtils.getHost(), TestUtils.getUser(), TestUtils.getKeyPair()); @@ -1149,7 +1224,7 @@ public void testCloseWithError() throws Exception { CompletableFuture future = new CompletableFuture<>(); future.completeExceptionally(new Exception("Simulating Error")); - Mockito.when(client.flush(true)).thenReturn(future); + when(client.flush(true)).thenReturn(future); Assert.assertFalse(client.isClosed()); try { @@ -1252,11 +1327,11 @@ public void testGetLatestCommittedOffsetTokens() throws Exception { CloseableHttpResponse httpResponse = Mockito.mock(CloseableHttpResponse.class); StatusLine statusLine = Mockito.mock(StatusLine.class); HttpEntity httpEntity = Mockito.mock(HttpEntity.class); - Mockito.when(statusLine.getStatusCode()).thenReturn(200); - Mockito.when(httpResponse.getStatusLine()).thenReturn(statusLine); - Mockito.when(httpResponse.getEntity()).thenReturn(httpEntity); - Mockito.when(httpEntity.getContent()).thenReturn(IOUtils.toInputStream(responseString)); - Mockito.when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); + when(statusLine.getStatusCode()).thenReturn(200); + when(httpResponse.getStatusLine()).thenReturn(statusLine); + when(httpResponse.getEntity()).thenReturn(httpEntity); + when(httpEntity.getContent()).thenReturn(IOUtils.toInputStream(responseString)); + when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); RequestBuilder requestBuilder = Mockito.spy(