diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java index ff879b8d6..b8a4d396d 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java @@ -350,6 +350,16 @@ void distributeFlushTasks() { if (!leftoverChannelsDataPerTable.isEmpty()) { channelsDataPerTable.addAll(leftoverChannelsDataPerTable); leftoverChannelsDataPerTable.clear(); + } else if (blobData.size() + >= this.owningClient.getParameterProvider().getMaxChunksInBlob()) { + // Create a new blob if the current one already contains max allowed number of chunks + logger.logInfo( + "Max allowed number of chunks in the current blob reached. chunkCount={}" + + " maxChunkCount={} currentBlobPath={}", + blobData.size(), + this.owningClient.getParameterProvider().getMaxChunksInBlob(), + blobPath); + break; } else { ConcurrentHashMap> table = itr.next().getValue(); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index 5916e472d..2b72fb51c 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -452,7 +452,42 @@ ChannelsStatusResponse getChannelsStatus( * @param blobs list of uploaded blobs */ void registerBlobs(List blobs) { - this.registerBlobs(blobs, 0); + for (List blobBatch : partitionBlobListForRegistrationRequest(blobs)) { + this.registerBlobs(blobBatch, 0); + } + } + + /** + * Partition the collection of blobs into sub-lists, so that the total number of chunks in each + * sublist does not exceed the max allowed number of chunks in one registration request. + */ + List> partitionBlobListForRegistrationRequest(List blobs) { + List> result = new ArrayList<>(); + List currentBatch = new ArrayList<>(); + int chunksInCurrentBatch = 0; + + for (BlobMetadata blob : blobs) { + if (chunksInCurrentBatch + blob.getChunks().size() + > parameterProvider.getMaxChunksInRegistrationRequest()) { + // Newly added BDEC file would exceed the max number of chunks in a single registration + // request. We put chunks collected so far into the result list and create a new batch with + // the current blob + result.add(currentBatch); + currentBatch = new ArrayList<>(); + currentBatch.add(blob); + chunksInCurrentBatch = blob.getChunks().size(); + } else { + // Newly added BDEC can be added to the current batch because it does not exceed the max + // number of chunks in a single registration request, yet. + currentBatch.add(blob); + chunksInCurrentBatch += blob.getChunks().size(); + } + } + + if (!currentBatch.isEmpty()) { + result.add(currentBatch); + } + return result; } /** diff --git a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java index 5c6f81f66..d8d7b7244 100644 --- a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java +++ b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java @@ -31,6 +31,9 @@ public class ParameterProvider { public static final String MAX_CHUNK_SIZE_IN_BYTES = "MAX_CHUNK_SIZE_IN_BYTES".toLowerCase(); public static final String MAX_ALLOWED_ROW_SIZE_IN_BYTES = "MAX_ALLOWED_ROW_SIZE_IN_BYTES".toLowerCase(); + public static final String MAX_CHUNKS_IN_BLOB = "MAX_CHUNKS_IN_BLOB".toLowerCase(); + public static final String MAX_CHUNKS_IN_REGISTRATION_REQUEST = + "MAX_CHUNKS_IN_REGISTRATION_REQUEST".toLowerCase(); public static final String MAX_CLIENT_LAG = "MAX_CLIENT_LAG".toLowerCase(); @@ -59,6 +62,8 @@ public class ParameterProvider { static final long MAX_CLIENT_LAG_MS_MAX = TimeUnit.MINUTES.toMillis(10); public static final long MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT = 64 * 1024 * 1024; // 64 MB + public static final int MAX_CHUNKS_IN_BLOB_DEFAULT = 20; + public static final int MAX_CHUNKS_IN_REGISTRATION_REQUEST_DEFAULT = 100; /* Parameter that enables using internal Parquet buffers for buffering of rows before serializing. It reduces memory consumption compared to using Java Objects for buffering.*/ @@ -79,6 +84,7 @@ public class ParameterProvider { */ public ParameterProvider(Map parameterOverrides, Properties props) { this.setParameterMap(parameterOverrides, props); + this.validateParameters(); } /** Empty constructor for tests */ @@ -170,6 +176,12 @@ private void setParameterMap(Map parameterOverrides, Properties this.updateValue(MAX_CLIENT_LAG, MAX_CLIENT_LAG_DEFAULT, parameterOverrides, props); this.updateValue( MAX_CLIENT_LAG_ENABLED, MAX_CLIENT_LAG_ENABLED_DEFAULT, parameterOverrides, props); + this.updateValue(MAX_CHUNKS_IN_BLOB, MAX_CHUNKS_IN_BLOB_DEFAULT, parameterOverrides, props); + this.updateValue( + MAX_CHUNKS_IN_REGISTRATION_REQUEST, + MAX_CHUNKS_IN_REGISTRATION_REQUEST_DEFAULT, + parameterOverrides, + props); } /** @return Longest interval in milliseconds between buffer flushes */ @@ -369,6 +381,7 @@ public long getMaxChunkSizeInBytes() { return (val instanceof String) ? Long.parseLong(val.toString()) : (long) val; } + /** @return The max allow row size (in bytes) */ public long getMaxAllowedRowSizeInBytes() { Object val = this.parameterMap.getOrDefault( @@ -376,6 +389,38 @@ public long getMaxAllowedRowSizeInBytes() { return (val instanceof String) ? Long.parseLong(val.toString()) : (long) val; } + /** @return The max number of chunks that can be put into a single BDEC file */ + public int getMaxChunksInBlob() { + Object val = this.parameterMap.getOrDefault(MAX_CHUNKS_IN_BLOB, MAX_CHUNKS_IN_BLOB_DEFAULT); + return (val instanceof String) ? Integer.parseInt(val.toString()) : (int) val; + } + + /** + * @return The max number of chunks that can be put into a single BDEC registration request. Must + * be higher than MAX_CHUNKS_IN_BLOB. + */ + public int getMaxChunksInRegistrationRequest() { + Object val = + this.parameterMap.getOrDefault( + MAX_CHUNKS_IN_REGISTRATION_REQUEST, MAX_CHUNKS_IN_REGISTRATION_REQUEST_DEFAULT); + return (val instanceof String) ? Integer.parseInt(val.toString()) : (int) val; + } + + /** Validates parameters */ + private void validateParameters() { + if (this.getMaxChunksInBlob() >= this.getMaxChunksInRegistrationRequest()) { + throw new SFException( + ErrorCode.INVALID_CONFIG_PARAMETER, + String.format( + "Value of configuration property %s (%d) must be smaller than the value of" + + " configuration property %s (%d).", + MAX_CHUNKS_IN_BLOB, + getMaxChunksInBlob(), + MAX_CHUNKS_IN_REGISTRATION_REQUEST, + getMaxChunksInRegistrationRequest())); + } + } + @Override public String toString() { return "ParameterProvider{" + "parameterMap=" + parameterMap + '}'; diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index 5a77a51db..b3c24f45f 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -35,6 +35,7 @@ import java.util.List; import java.util.Map; import java.util.TimeZone; +import java.util.UUID; import java.util.concurrent.TimeUnit; import javax.crypto.BadPaddingException; import javax.crypto.IllegalBlockSizeException; @@ -273,7 +274,22 @@ TestContext>> create() { } } - TestContextFactory testContextFactory; + TestContextFactory>> testContextFactory; + + private SnowflakeStreamingIngestChannelInternal>> addChannel( + TestContext>> testContext, int tableId, long encryptionKeyId) { + return testContext + .channelBuilder("channel" + UUID.randomUUID()) + .setDBName("db1") + .setSchemaName("PUBLIC") + .setTableName("table" + tableId) + .setOffsetToken("offset1") + .setChannelSequencer(0L) + .setRowSequencer(0L) + .setEncryptionKey("key") + .setEncryptionKeyId(encryptionKeyId) + .buildAndAdd(); + } private SnowflakeStreamingIngestChannelInternal addChannel1(TestContext testContext) { return testContext @@ -546,6 +562,107 @@ public void testBlobSplitDueToChunkSizeLimit() throws Exception { Mockito.verify(flushService, Mockito.times(2)).buildAndUpload(Mockito.any(), Mockito.any()); } + @Test + public void testBlobSplitDueToNumberOfChunks() throws Exception { + for (int rowCount : Arrays.asList(0, 1, 30, 111, 159, 287, 1287, 1599, 4496)) { + runTestBlobSplitDueToNumberOfChunks(rowCount); + } + } + + /** + * Insert rows in batches of 3 into each table and assert that the expected number of blobs is + * generated. + * + * @param numberOfRows How many rows to insert + */ + public void runTestBlobSplitDueToNumberOfChunks(int numberOfRows) throws Exception { + int channelsPerTable = 3; + int expectedBlobs = + (int) + Math.ceil( + (double) numberOfRows + / channelsPerTable + / ParameterProvider.MAX_CHUNKS_IN_BLOB_DEFAULT); + + final TestContext>> testContext = testContextFactory.create(); + + for (int i = 0; i < numberOfRows; i++) { + SnowflakeStreamingIngestChannelInternal>> channel = + addChannel(testContext, i / channelsPerTable, 1); + channel.setupSchema(Collections.singletonList(createLargeTestTextColumn("C1"))); + channel.insertRow(Collections.singletonMap("C1", i), ""); + } + + FlushService>> flushService = testContext.flushService; + flushService.flush(true).get(); + + ArgumentCaptor>>>>> blobDataCaptor = + ArgumentCaptor.forClass(List.class); + Mockito.verify(flushService, Mockito.times(expectedBlobs)) + .buildAndUpload(Mockito.any(), blobDataCaptor.capture()); + + // 1. list => blobs; 2. list => chunks; 3. list => channels; 4. list => rows, 5. list => columns + List>>>>> allUploadedBlobs = + blobDataCaptor.getAllValues(); + + Assert.assertEquals(numberOfRows, getRows(allUploadedBlobs).size()); + } + + @Test + public void testBlobSplitDueToNumberOfChunksWithLeftoverChannels() throws Exception { + final TestContext>> testContext = testContextFactory.create(); + + for (int i = 0; i < 19; i++) { // 19 simple chunks + SnowflakeStreamingIngestChannelInternal>> channel = + addChannel(testContext, i, 1); + channel.setupSchema(Collections.singletonList(createLargeTestTextColumn("C1"))); + channel.insertRow(Collections.singletonMap("C1", i), ""); + } + + // 20th chunk would contain multiple channels, but there are some with different encryption key + // ID, so they spill to a new blob + SnowflakeStreamingIngestChannelInternal>> channel1 = + addChannel(testContext, 19, 1); + channel1.setupSchema(Collections.singletonList(createLargeTestTextColumn("C1"))); + channel1.insertRow(Collections.singletonMap("C1", 19), ""); + + SnowflakeStreamingIngestChannelInternal>> channel2 = + addChannel(testContext, 19, 2); + channel2.setupSchema(Collections.singletonList(createLargeTestTextColumn("C1"))); + channel2.insertRow(Collections.singletonMap("C1", 19), ""); + + SnowflakeStreamingIngestChannelInternal>> channel3 = + addChannel(testContext, 19, 2); + channel3.setupSchema(Collections.singletonList(createLargeTestTextColumn("C1"))); + channel3.insertRow(Collections.singletonMap("C1", 19), ""); + + FlushService>> flushService = testContext.flushService; + flushService.flush(true).get(); + + ArgumentCaptor>>>>> blobDataCaptor = + ArgumentCaptor.forClass(List.class); + Mockito.verify(flushService, Mockito.atLeast(2)) + .buildAndUpload(Mockito.any(), blobDataCaptor.capture()); + + // 1. list => blobs; 2. list => chunks; 3. list => channels; 4. list => rows, 5. list => columns + List>>>>> allUploadedBlobs = + blobDataCaptor.getAllValues(); + + Assert.assertEquals(22, getRows(allUploadedBlobs).size()); + } + + private List> getRows(List>>>>> blobs) { + List> result = new ArrayList<>(); + blobs.forEach( + chunks -> + chunks.forEach( + channels -> + channels.forEach( + chunkData -> + result.addAll(((ParquetChunkData) chunkData.getVectors()).rows)))); + return result; + } + @Test public void testBuildAndUpload() throws Exception { long expectedBuildLatencyMs = 100; diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ManyTablesIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/ManyTablesIT.java new file mode 100644 index 000000000..55844ca8f --- /dev/null +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ManyTablesIT.java @@ -0,0 +1,99 @@ +package net.snowflake.ingest.streaming.internal; + +import static net.snowflake.ingest.utils.Constants.ROLE; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Collections; +import java.util.Map; +import java.util.Properties; +import net.snowflake.ingest.TestUtils; +import net.snowflake.ingest.streaming.OpenChannelRequest; +import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel; +import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient; +import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClientFactory; +import net.snowflake.ingest.utils.Constants; +import net.snowflake.ingest.utils.ParameterProvider; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Verified that ingestion work when we ingest into large number of tables from the same client and + * blobs and registration requests have to be cut, so they don't contain large number of chunks + */ +public class ManyTablesIT { + + private static final int TABLES_COUNT = 100; + private static final int TOTAL_ROWS_COUNT = 20_000; + private String dbName; + private SnowflakeStreamingIngestClient client; + private Connection connection; + private SnowflakeStreamingIngestChannel[] channels; + private String[] offsetTokensPerChannel; + + @Before + public void setUp() throws Exception { + Properties props = TestUtils.getProperties(Constants.BdecVersion.THREE, false); + props.put(ParameterProvider.MAX_CHUNKS_IN_BLOB, 2); + props.put(ParameterProvider.MAX_CHUNKS_IN_REGISTRATION_REQUEST, 3); + if (props.getProperty(ROLE).equals("DEFAULT_ROLE")) { + props.setProperty(ROLE, "ACCOUNTADMIN"); + } + client = SnowflakeStreamingIngestClientFactory.builder("client1").setProperties(props).build(); + connection = TestUtils.getConnection(true); + dbName = String.format("sdk_it_many_tables_db_%d", System.nanoTime()); + + channels = new SnowflakeStreamingIngestChannel[TABLES_COUNT]; + offsetTokensPerChannel = new String[TABLES_COUNT]; + connection.createStatement().execute(String.format("create database %s;", dbName)); + + String[] tableNames = new String[TABLES_COUNT]; + for (int i = 0; i < tableNames.length; i++) { + tableNames[i] = String.format("table_%d", i); + connection.createStatement().execute(String.format("create table table_%d(c int);", i)); + channels[i] = + client.openChannel( + OpenChannelRequest.builder(String.format("channel-%d", i)) + .setDBName(dbName) + .setSchemaName("public") + .setTableName(tableNames[i]) + .setOnErrorOption(OpenChannelRequest.OnErrorOption.ABORT) + .build()); + } + } + + @After + public void tearDown() throws Exception { + connection.createStatement().execute(String.format("drop database %s;", dbName)); + client.close(); + connection.close(); + } + + @Test + public void testIngestionIntoManyTables() throws InterruptedException, SQLException { + for (int i = 0; i < TOTAL_ROWS_COUNT; i++) { + Map row = Collections.singletonMap("c", i); + String offset = String.valueOf(i); + int channelId = i % channels.length; + channels[channelId].insertRow(row, offset); + offsetTokensPerChannel[channelId] = offset; + } + + for (int i = 0; i < channels.length; i++) { + TestUtils.waitForOffset(channels[i], offsetTokensPerChannel[i]); + } + + int totalRowsCount = 0; + ResultSet rs = + connection + .createStatement() + .executeQuery(String.format("show tables in database %s;", dbName)); + while (rs.next()) { + totalRowsCount += rs.getInt("rows"); + } + Assert.assertEquals(TOTAL_ROWS_COUNT, totalRowsCount); + } +} diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java index def5f7ecf..bd1a7ab2b 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java @@ -3,7 +3,9 @@ import java.util.HashMap; import java.util.Map; import java.util.Properties; +import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.ParameterProvider; +import net.snowflake.ingest.utils.SFException; import org.junit.Assert; import org.junit.Test; @@ -272,4 +274,29 @@ public void testMaxClientLagEnabledThresholdAbove() { Assert.assertTrue(e.getMessage().startsWith("Lag falls outside")); } } + + @Test + public void testMaxChunksInBlobAndRegistrationRequest() { + Properties prop = new Properties(); + Map parameterMap = getStartingParameterMap(); + parameterMap.put("max_chunks_in_blob", 1); + parameterMap.put("max_chunks_in_registration_request", 2); + ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); + Assert.assertEquals(1, parameterProvider.getMaxChunksInBlob()); + Assert.assertEquals(2, parameterProvider.getMaxChunksInRegistrationRequest()); + } + + @Test + public void testValidationMaxChunksInBlobAndRegistrationRequest() { + Properties prop = new Properties(); + Map parameterMap = getStartingParameterMap(); + parameterMap.put("max_chunks_in_blob", 2); + parameterMap.put("max_chunks_in_registration_request", 1); + try { + new ParameterProvider(parameterMap, prop); + Assert.fail("Should not have succeeded"); + } catch (SFException e) { + Assert.assertEquals(ErrorCode.INVALID_CONFIG_PARAMETER.getMessageCode(), e.getVendorCode()); + } + } } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java index a99054c9f..6a9498b0e 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java @@ -11,6 +11,8 @@ import static net.snowflake.ingest.utils.Constants.ROLE; import static net.snowflake.ingest.utils.Constants.USER; import static net.snowflake.ingest.utils.ParameterProvider.ENABLE_SNOWPIPE_STREAMING_METRICS; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.when; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; @@ -319,11 +321,11 @@ public void testGetChannelsStatusWithRequest() throws Exception { CloseableHttpResponse httpResponse = Mockito.mock(CloseableHttpResponse.class); StatusLine statusLine = Mockito.mock(StatusLine.class); HttpEntity httpEntity = Mockito.mock(HttpEntity.class); - Mockito.when(statusLine.getStatusCode()).thenReturn(200); - Mockito.when(httpResponse.getStatusLine()).thenReturn(statusLine); - Mockito.when(httpResponse.getEntity()).thenReturn(httpEntity); - Mockito.when(httpEntity.getContent()).thenReturn(IOUtils.toInputStream(responseString)); - Mockito.when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); + when(statusLine.getStatusCode()).thenReturn(200); + when(httpResponse.getStatusLine()).thenReturn(statusLine); + when(httpResponse.getEntity()).thenReturn(httpEntity); + when(httpEntity.getContent()).thenReturn(IOUtils.toInputStream(responseString)); + when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); RequestBuilder requestBuilder = Mockito.spy( @@ -378,11 +380,11 @@ public void testGetChannelsStatusWithRequestError() throws Exception { CloseableHttpResponse httpResponse = Mockito.mock(CloseableHttpResponse.class); StatusLine statusLine = Mockito.mock(StatusLine.class); HttpEntity httpEntity = Mockito.mock(HttpEntity.class); - Mockito.when(statusLine.getStatusCode()).thenReturn(500); - Mockito.when(httpResponse.getStatusLine()).thenReturn(statusLine); - Mockito.when(httpResponse.getEntity()).thenReturn(httpEntity); - Mockito.when(httpEntity.getContent()).thenReturn(IOUtils.toInputStream(responseString)); - Mockito.when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); + when(statusLine.getStatusCode()).thenReturn(500); + when(httpResponse.getStatusLine()).thenReturn(statusLine); + when(httpResponse.getEntity()).thenReturn(httpEntity); + when(httpEntity.getContent()).thenReturn(IOUtils.toInputStream(responseString)); + when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); RequestBuilder requestBuilder = Mockito.spy( @@ -645,12 +647,12 @@ public void testRegisterBlobErrorResponse() throws Exception { CloseableHttpResponse httpResponse = Mockito.mock(CloseableHttpResponse.class); StatusLine statusLine = Mockito.mock(StatusLine.class); HttpEntity httpEntity = Mockito.mock(HttpEntity.class); - Mockito.when(statusLine.getStatusCode()).thenReturn(500); - Mockito.when(httpResponse.getStatusLine()).thenReturn(statusLine); - Mockito.when(httpResponse.getEntity()).thenReturn(httpEntity); + when(statusLine.getStatusCode()).thenReturn(500); + when(httpResponse.getStatusLine()).thenReturn(statusLine); + when(httpResponse.getEntity()).thenReturn(httpEntity); String response = "testRegisterBlobErrorResponse"; - Mockito.when(httpEntity.getContent()).thenReturn(IOUtils.toInputStream(response)); - Mockito.when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); + when(httpEntity.getContent()).thenReturn(IOUtils.toInputStream(response)); + when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); RequestBuilder requestBuilder = new RequestBuilder(TestUtils.getHost(), TestUtils.getUser(), TestUtils.getKeyPair()); @@ -693,11 +695,11 @@ public void testRegisterBlobSnowflakeInternalErrorResponse() throws Exception { CloseableHttpResponse httpResponse = Mockito.mock(CloseableHttpResponse.class); StatusLine statusLine = Mockito.mock(StatusLine.class); HttpEntity httpEntity = Mockito.mock(HttpEntity.class); - Mockito.when(statusLine.getStatusCode()).thenReturn(200); - Mockito.when(httpResponse.getStatusLine()).thenReturn(statusLine); - Mockito.when(httpResponse.getEntity()).thenReturn(httpEntity); - Mockito.when(httpEntity.getContent()).thenReturn(IOUtils.toInputStream(response)); - Mockito.when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); + when(statusLine.getStatusCode()).thenReturn(200); + when(httpResponse.getStatusLine()).thenReturn(statusLine); + when(httpResponse.getEntity()).thenReturn(httpEntity); + when(httpEntity.getContent()).thenReturn(IOUtils.toInputStream(response)); + when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); RequestBuilder requestBuilder = new RequestBuilder(TestUtils.getHost(), TestUtils.getUser(), TestUtils.getKeyPair()); @@ -749,11 +751,11 @@ public void testRegisterBlobSuccessResponse() throws Exception { CloseableHttpResponse httpResponse = Mockito.mock(CloseableHttpResponse.class); StatusLine statusLine = Mockito.mock(StatusLine.class); HttpEntity httpEntity = Mockito.mock(HttpEntity.class); - Mockito.when(statusLine.getStatusCode()).thenReturn(200); - Mockito.when(httpResponse.getStatusLine()).thenReturn(statusLine); - Mockito.when(httpResponse.getEntity()).thenReturn(httpEntity); - Mockito.when(httpEntity.getContent()).thenReturn(IOUtils.toInputStream(response)); - Mockito.when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); + when(statusLine.getStatusCode()).thenReturn(200); + when(httpResponse.getStatusLine()).thenReturn(statusLine); + when(httpResponse.getEntity()).thenReturn(httpEntity); + when(httpEntity.getContent()).thenReturn(IOUtils.toInputStream(response)); + when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); RequestBuilder requestBuilder = new RequestBuilder(TestUtils.getHost(), TestUtils.getUser(), TestUtils.getKeyPair()); @@ -827,16 +829,16 @@ public void testRegisterBlobsRetries() throws Exception { CloseableHttpResponse httpResponse = Mockito.mock(CloseableHttpResponse.class); StatusLine statusLine = Mockito.mock(StatusLine.class); HttpEntity httpEntity = Mockito.mock(HttpEntity.class); - Mockito.when(statusLine.getStatusCode()).thenReturn(200); - Mockito.when(httpResponse.getStatusLine()).thenReturn(statusLine); - Mockito.when(httpResponse.getEntity()).thenReturn(httpEntity); - Mockito.when(httpEntity.getContent()) + when(statusLine.getStatusCode()).thenReturn(200); + when(httpResponse.getStatusLine()).thenReturn(statusLine); + when(httpResponse.getEntity()).thenReturn(httpEntity); + when(httpEntity.getContent()) .thenReturn( IOUtils.toInputStream(responseString), IOUtils.toInputStream(retryResponseString), IOUtils.toInputStream(retryResponseString), IOUtils.toInputStream(retryResponseString)); - Mockito.when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); + when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); RequestBuilder requestBuilder = Mockito.spy( @@ -862,6 +864,79 @@ public void testRegisterBlobsRetries() throws Exception { Assert.assertFalse(channel2.isValid()); } + @Test + public void testRegisterBlobChunkLimit() throws Exception { + CloseableHttpClient httpClient = Mockito.mock(CloseableHttpClient.class); + RequestBuilder requestBuilder = + Mockito.spy( + new RequestBuilder(TestUtils.getHost(), TestUtils.getUser(), TestUtils.getKeyPair())); + + SnowflakeStreamingIngestClientInternal client = + Mockito.spy( + new SnowflakeStreamingIngestClientInternal<>( + "client", + new SnowflakeURL("snowflake.dev.local:8082"), + null, + httpClient, + true, + requestBuilder, + null)); + + assertEquals(0, client.partitionBlobListForRegistrationRequest(new ArrayList<>()).size()); + assertEquals( + 1, client.partitionBlobListForRegistrationRequest(createTestBlobMetadata(99)).size()); + assertEquals( + 2, client.partitionBlobListForRegistrationRequest(createTestBlobMetadata(101)).size()); + assertEquals( + 2, client.partitionBlobListForRegistrationRequest(createTestBlobMetadata(99, 2)).size()); + assertEquals( + 2, + client + .partitionBlobListForRegistrationRequest(createTestBlobMetadata(55, 44, 2, 98)) + .size()); + assertEquals( + 3, + client + .partitionBlobListForRegistrationRequest(createTestBlobMetadata(55, 44, 2, 99)) + .size()); + assertEquals( + 3, + client + .partitionBlobListForRegistrationRequest(createTestBlobMetadata(55, 44, 2, 99, 1)) + .size()); + } + + /** + * Generate blob metadata with specified number of chunks per blob + * + * @param numbersOfChunks Array of chunk numbers per blob + * @return List of blob metadata + */ + private List createTestBlobMetadata(int... numbersOfChunks) { + List result = new ArrayList<>(); + for (int n : numbersOfChunks) { + List chunkMetadata = new ArrayList<>(); + for (int i = 0; i < n; i++) { + ChunkMetadata chunk = + ChunkMetadata.builder() + .setOwningTableFromChannelContext(channel1.getChannelContext()) + .setChunkStartOffset(0L) + .setChunkLength(1) + .setEncryptionKeyId(0L) + .setChunkMD5("") + .setEpInfo(new EpInfo()) + .setChannelList(new ArrayList<>()) + .setFirstInsertTimeInMs(0L) + .setLastInsertTimeInMs(0L) + .build(); + chunkMetadata.add(chunk); + } + + result.add(new BlobMetadata("", "", chunkMetadata, new BlobStats())); + } + return result; + } + @Test public void testRegisterBlobsRetriesSucceeds() throws Exception { Pair, Set> testData = getRetryBlobMetadata(); @@ -945,13 +1020,13 @@ public void testRegisterBlobsRetriesSucceeds() throws Exception { CloseableHttpResponse httpResponse = Mockito.mock(CloseableHttpResponse.class); StatusLine statusLine = Mockito.mock(StatusLine.class); HttpEntity httpEntity = Mockito.mock(HttpEntity.class); - Mockito.when(statusLine.getStatusCode()).thenReturn(200); - Mockito.when(httpResponse.getStatusLine()).thenReturn(statusLine); - Mockito.when(httpResponse.getEntity()).thenReturn(httpEntity); - Mockito.when(httpEntity.getContent()) + when(statusLine.getStatusCode()).thenReturn(200); + when(httpResponse.getStatusLine()).thenReturn(statusLine); + when(httpResponse.getEntity()).thenReturn(httpEntity); + when(httpEntity.getContent()) .thenReturn( IOUtils.toInputStream(responseString), IOUtils.toInputStream(retryResponseString)); - Mockito.when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); + when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); RequestBuilder requestBuilder = Mockito.spy( @@ -1022,11 +1097,11 @@ public void testRegisterBlobResponseWithInvalidChannel() throws Exception { CloseableHttpResponse httpResponse = Mockito.mock(CloseableHttpResponse.class); StatusLine statusLine = Mockito.mock(StatusLine.class); HttpEntity httpEntity = Mockito.mock(HttpEntity.class); - Mockito.when(statusLine.getStatusCode()).thenReturn(200); - Mockito.when(httpResponse.getStatusLine()).thenReturn(statusLine); - Mockito.when(httpResponse.getEntity()).thenReturn(httpEntity); - Mockito.when(httpEntity.getContent()).thenReturn(IOUtils.toInputStream(response)); - Mockito.when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); + when(statusLine.getStatusCode()).thenReturn(200); + when(httpResponse.getStatusLine()).thenReturn(statusLine); + when(httpResponse.getEntity()).thenReturn(httpEntity); + when(httpEntity.getContent()).thenReturn(IOUtils.toInputStream(response)); + when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); RequestBuilder requestBuilder = new RequestBuilder(TestUtils.getHost(), TestUtils.getUser(), TestUtils.getKeyPair()); @@ -1146,7 +1221,7 @@ public void testCloseWithError() throws Exception { CompletableFuture future = new CompletableFuture<>(); future.completeExceptionally(new Exception("Simulating Error")); - Mockito.when(client.flush(true)).thenReturn(future); + when(client.flush(true)).thenReturn(future); Assert.assertFalse(client.isClosed()); try { @@ -1249,11 +1324,11 @@ public void testGetLatestCommittedOffsetTokens() throws Exception { CloseableHttpResponse httpResponse = Mockito.mock(CloseableHttpResponse.class); StatusLine statusLine = Mockito.mock(StatusLine.class); HttpEntity httpEntity = Mockito.mock(HttpEntity.class); - Mockito.when(statusLine.getStatusCode()).thenReturn(200); - Mockito.when(httpResponse.getStatusLine()).thenReturn(statusLine); - Mockito.when(httpResponse.getEntity()).thenReturn(httpEntity); - Mockito.when(httpEntity.getContent()).thenReturn(IOUtils.toInputStream(responseString)); - Mockito.when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); + when(statusLine.getStatusCode()).thenReturn(200); + when(httpResponse.getStatusLine()).thenReturn(statusLine); + when(httpResponse.getEntity()).thenReturn(httpEntity); + when(httpEntity.getContent()).thenReturn(IOUtils.toInputStream(responseString)); + when(httpClient.execute(Mockito.any())).thenReturn(httpResponse); RequestBuilder requestBuilder = Mockito.spy(