diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java index b8a4d396d..cb1ef7810 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java @@ -351,13 +351,15 @@ void distributeFlushTasks() { channelsDataPerTable.addAll(leftoverChannelsDataPerTable); leftoverChannelsDataPerTable.clear(); } else if (blobData.size() - >= this.owningClient.getParameterProvider().getMaxChunksInBlob()) { + >= this.owningClient + .getParameterProvider() + .getMaxChunksInBlobAndRegistrationRequest()) { // Create a new blob if the current one already contains max allowed number of chunks logger.logInfo( "Max allowed number of chunks in the current blob reached. chunkCount={}" + " maxChunkCount={} currentBlobPath={}", blobData.size(), - this.owningClient.getParameterProvider().getMaxChunksInBlob(), + this.owningClient.getParameterProvider().getMaxChunksInBlobAndRegistrationRequest(), blobPath); break; } else { diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index 2b72fb51c..27ff9407e 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -465,10 +465,21 @@ List> partitionBlobListForRegistrationRequest(List> result = new ArrayList<>(); List currentBatch = new ArrayList<>(); int chunksInCurrentBatch = 0; + int maxChunksInBlobAndRegistrationRequest = + parameterProvider.getMaxChunksInBlobAndRegistrationRequest(); for (BlobMetadata blob : blobs) { - if (chunksInCurrentBatch + blob.getChunks().size() - > parameterProvider.getMaxChunksInRegistrationRequest()) { + if (blob.getChunks().size() > maxChunksInBlobAndRegistrationRequest) { + throw new SFException( + ErrorCode.INTERNAL_ERROR, + String.format( + "Incorrectly generated blob detected - number of chunks in the blob is larger than" + + " the max allowed number of chunks. Please report this bug to Snowflake." + + " bdec=%s chunkCount=%d maxAllowedChunkCount=%d", + blob.getPath(), blob.getChunks().size(), maxChunksInBlobAndRegistrationRequest)); + } + + if (chunksInCurrentBatch + blob.getChunks().size() > maxChunksInBlobAndRegistrationRequest) { // Newly added BDEC file would exceed the max number of chunks in a single registration // request. We put chunks collected so far into the result list and create a new batch with // the current blob diff --git a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java index d8d7b7244..3a2221697 100644 --- a/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java +++ b/src/main/java/net/snowflake/ingest/utils/ParameterProvider.java @@ -31,9 +31,8 @@ public class ParameterProvider { public static final String MAX_CHUNK_SIZE_IN_BYTES = "MAX_CHUNK_SIZE_IN_BYTES".toLowerCase(); public static final String MAX_ALLOWED_ROW_SIZE_IN_BYTES = "MAX_ALLOWED_ROW_SIZE_IN_BYTES".toLowerCase(); - public static final String MAX_CHUNKS_IN_BLOB = "MAX_CHUNKS_IN_BLOB".toLowerCase(); - public static final String MAX_CHUNKS_IN_REGISTRATION_REQUEST = - "MAX_CHUNKS_IN_REGISTRATION_REQUEST".toLowerCase(); + public static final String MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST = + "MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST".toLowerCase(); public static final String MAX_CLIENT_LAG = "MAX_CLIENT_LAG".toLowerCase(); @@ -62,8 +61,7 @@ public class ParameterProvider { static final long MAX_CLIENT_LAG_MS_MAX = TimeUnit.MINUTES.toMillis(10); public static final long MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT = 64 * 1024 * 1024; // 64 MB - public static final int MAX_CHUNKS_IN_BLOB_DEFAULT = 20; - public static final int MAX_CHUNKS_IN_REGISTRATION_REQUEST_DEFAULT = 100; + public static final int MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT = 100; /* Parameter that enables using internal Parquet buffers for buffering of rows before serializing. It reduces memory consumption compared to using Java Objects for buffering.*/ @@ -84,7 +82,6 @@ public class ParameterProvider { */ public ParameterProvider(Map parameterOverrides, Properties props) { this.setParameterMap(parameterOverrides, props); - this.validateParameters(); } /** Empty constructor for tests */ @@ -176,10 +173,9 @@ private void setParameterMap(Map parameterOverrides, Properties this.updateValue(MAX_CLIENT_LAG, MAX_CLIENT_LAG_DEFAULT, parameterOverrides, props); this.updateValue( MAX_CLIENT_LAG_ENABLED, MAX_CLIENT_LAG_ENABLED_DEFAULT, parameterOverrides, props); - this.updateValue(MAX_CHUNKS_IN_BLOB, MAX_CHUNKS_IN_BLOB_DEFAULT, parameterOverrides, props); this.updateValue( - MAX_CHUNKS_IN_REGISTRATION_REQUEST, - MAX_CHUNKS_IN_REGISTRATION_REQUEST_DEFAULT, + MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST, + MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT, parameterOverrides, props); } @@ -389,38 +385,18 @@ public long getMaxAllowedRowSizeInBytes() { return (val instanceof String) ? Long.parseLong(val.toString()) : (long) val; } - /** @return The max number of chunks that can be put into a single BDEC file */ - public int getMaxChunksInBlob() { - Object val = this.parameterMap.getOrDefault(MAX_CHUNKS_IN_BLOB, MAX_CHUNKS_IN_BLOB_DEFAULT); - return (val instanceof String) ? Integer.parseInt(val.toString()) : (int) val; - } - /** - * @return The max number of chunks that can be put into a single BDEC registration request. Must - * be higher than MAX_CHUNKS_IN_BLOB. + * @return The max number of chunks that can be put into a single BDEC or blob registration + * request. */ - public int getMaxChunksInRegistrationRequest() { + public int getMaxChunksInBlobAndRegistrationRequest() { Object val = this.parameterMap.getOrDefault( - MAX_CHUNKS_IN_REGISTRATION_REQUEST, MAX_CHUNKS_IN_REGISTRATION_REQUEST_DEFAULT); + MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST, + MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT); return (val instanceof String) ? Integer.parseInt(val.toString()) : (int) val; } - /** Validates parameters */ - private void validateParameters() { - if (this.getMaxChunksInBlob() >= this.getMaxChunksInRegistrationRequest()) { - throw new SFException( - ErrorCode.INVALID_CONFIG_PARAMETER, - String.format( - "Value of configuration property %s (%d) must be smaller than the value of" - + " configuration property %s (%d).", - MAX_CHUNKS_IN_BLOB, - getMaxChunksInBlob(), - MAX_CHUNKS_IN_REGISTRATION_REQUEST, - getMaxChunksInRegistrationRequest())); - } - } - @Override public String toString() { return "ParameterProvider{" + "parameterMap=" + parameterMap + '}'; diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index b3c24f45f..17f929206 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -582,7 +582,7 @@ public void runTestBlobSplitDueToNumberOfChunks(int numberOfRows) throws Excepti Math.ceil( (double) numberOfRows / channelsPerTable - / ParameterProvider.MAX_CHUNKS_IN_BLOB_DEFAULT); + / ParameterProvider.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT); final TestContext>> testContext = testContextFactory.create(); @@ -612,7 +612,7 @@ public void runTestBlobSplitDueToNumberOfChunks(int numberOfRows) throws Excepti public void testBlobSplitDueToNumberOfChunksWithLeftoverChannels() throws Exception { final TestContext>> testContext = testContextFactory.create(); - for (int i = 0; i < 19; i++) { // 19 simple chunks + for (int i = 0; i < 99; i++) { // 19 simple chunks SnowflakeStreamingIngestChannelInternal>> channel = addChannel(testContext, i, 1); channel.setupSchema(Collections.singletonList(createLargeTestTextColumn("C1"))); @@ -622,19 +622,19 @@ public void testBlobSplitDueToNumberOfChunksWithLeftoverChannels() throws Except // 20th chunk would contain multiple channels, but there are some with different encryption key // ID, so they spill to a new blob SnowflakeStreamingIngestChannelInternal>> channel1 = - addChannel(testContext, 19, 1); + addChannel(testContext, 99, 1); channel1.setupSchema(Collections.singletonList(createLargeTestTextColumn("C1"))); - channel1.insertRow(Collections.singletonMap("C1", 19), ""); + channel1.insertRow(Collections.singletonMap("C1", 0), ""); SnowflakeStreamingIngestChannelInternal>> channel2 = - addChannel(testContext, 19, 2); + addChannel(testContext, 99, 2); channel2.setupSchema(Collections.singletonList(createLargeTestTextColumn("C1"))); - channel2.insertRow(Collections.singletonMap("C1", 19), ""); + channel2.insertRow(Collections.singletonMap("C1", 0), ""); SnowflakeStreamingIngestChannelInternal>> channel3 = - addChannel(testContext, 19, 2); + addChannel(testContext, 99, 2); channel3.setupSchema(Collections.singletonList(createLargeTestTextColumn("C1"))); - channel3.insertRow(Collections.singletonMap("C1", 19), ""); + channel3.insertRow(Collections.singletonMap("C1", 0), ""); FlushService>> flushService = testContext.flushService; flushService.flush(true).get(); @@ -648,7 +648,7 @@ public void testBlobSplitDueToNumberOfChunksWithLeftoverChannels() throws Except List>>>>> allUploadedBlobs = blobDataCaptor.getAllValues(); - Assert.assertEquals(22, getRows(allUploadedBlobs).size()); + Assert.assertEquals(102, getRows(allUploadedBlobs).size()); } private List> getRows(List>>>>> blobs) { diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ManyTablesIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/ManyTablesIT.java index 55844ca8f..d32adfe3f 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ManyTablesIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ManyTablesIT.java @@ -26,8 +26,8 @@ */ public class ManyTablesIT { - private static final int TABLES_COUNT = 100; - private static final int TOTAL_ROWS_COUNT = 20_000; + private static final int TABLES_COUNT = 20; + private static final int TOTAL_ROWS_COUNT = 200_000; private String dbName; private SnowflakeStreamingIngestClient client; private Connection connection; @@ -37,8 +37,7 @@ public class ManyTablesIT { @Before public void setUp() throws Exception { Properties props = TestUtils.getProperties(Constants.BdecVersion.THREE, false); - props.put(ParameterProvider.MAX_CHUNKS_IN_BLOB, 2); - props.put(ParameterProvider.MAX_CHUNKS_IN_REGISTRATION_REQUEST, 3); + props.put(ParameterProvider.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST, 2); if (props.getProperty(ROLE).equals("DEFAULT_ROLE")) { props.setProperty(ROLE, "ACCOUNTADMIN"); } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java index bd1a7ab2b..7838763de 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/ParameterProviderTest.java @@ -3,9 +3,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Properties; -import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.ParameterProvider; -import net.snowflake.ingest.utils.SFException; import org.junit.Assert; import org.junit.Test; @@ -279,24 +277,8 @@ public void testMaxClientLagEnabledThresholdAbove() { public void testMaxChunksInBlobAndRegistrationRequest() { Properties prop = new Properties(); Map parameterMap = getStartingParameterMap(); - parameterMap.put("max_chunks_in_blob", 1); - parameterMap.put("max_chunks_in_registration_request", 2); + parameterMap.put("max_chunks_in_blob_and_registration_request", 1); ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop); - Assert.assertEquals(1, parameterProvider.getMaxChunksInBlob()); - Assert.assertEquals(2, parameterProvider.getMaxChunksInRegistrationRequest()); - } - - @Test - public void testValidationMaxChunksInBlobAndRegistrationRequest() { - Properties prop = new Properties(); - Map parameterMap = getStartingParameterMap(); - parameterMap.put("max_chunks_in_blob", 2); - parameterMap.put("max_chunks_in_registration_request", 1); - try { - new ParameterProvider(parameterMap, prop); - Assert.fail("Should not have succeeded"); - } catch (SFException e) { - Assert.assertEquals(ErrorCode.INVALID_CONFIG_PARAMETER.getMessageCode(), e.getVendorCode()); - } + Assert.assertEquals(1, parameterProvider.getMaxChunksInBlobAndRegistrationRequest()); } } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java index 1d0279805..11fc0b93b 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientTest.java @@ -883,14 +883,23 @@ public void testRegisterBlobChunkLimit() throws Exception { null)); assertEquals(0, client.partitionBlobListForRegistrationRequest(new ArrayList<>()).size()); + assertEquals( + 1, client.partitionBlobListForRegistrationRequest(createTestBlobMetadata(1)).size()); assertEquals( 1, client.partitionBlobListForRegistrationRequest(createTestBlobMetadata(99)).size()); assertEquals( 1, client.partitionBlobListForRegistrationRequest(createTestBlobMetadata(100)).size()); + assertEquals( - 2, client.partitionBlobListForRegistrationRequest(createTestBlobMetadata(101)).size()); + 1, client.partitionBlobListForRegistrationRequest(createTestBlobMetadata(3, 95, 2)).size()); + assertEquals( + 2, + client.partitionBlobListForRegistrationRequest(createTestBlobMetadata(3, 95, 2, 1)).size()); assertEquals( - 2, client.partitionBlobListForRegistrationRequest(createTestBlobMetadata(200)).size()); + 3, + client + .partitionBlobListForRegistrationRequest(createTestBlobMetadata(3, 95, 2, 1, 100)) + .size()); assertEquals( 2, client.partitionBlobListForRegistrationRequest(createTestBlobMetadata(99, 2)).size()); assertEquals(