Skip to content

Commit

Permalink
Merge parameters into 1
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-lsembera committed Oct 18, 2023
1 parent 5c93ef8 commit 7166bde
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -351,13 +351,15 @@ void distributeFlushTasks() {
channelsDataPerTable.addAll(leftoverChannelsDataPerTable);
leftoverChannelsDataPerTable.clear();
} else if (blobData.size()
>= this.owningClient.getParameterProvider().getMaxChunksInBlob()) {
>= this.owningClient
.getParameterProvider()
.getMaxChunksInBlobAndRegistrationRequest()) {
// Create a new blob if the current one already contains max allowed number of chunks
logger.logInfo(
"Max allowed number of chunks in the current blob reached. chunkCount={}"
+ " maxChunkCount={} currentBlobPath={}",
blobData.size(),
this.owningClient.getParameterProvider().getMaxChunksInBlob(),
this.owningClient.getParameterProvider().getMaxChunksInBlobAndRegistrationRequest(),
blobPath);
break;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -465,10 +465,21 @@ List<List<BlobMetadata>> partitionBlobListForRegistrationRequest(List<BlobMetada
List<List<BlobMetadata>> result = new ArrayList<>();
List<BlobMetadata> currentBatch = new ArrayList<>();
int chunksInCurrentBatch = 0;
int maxChunksInBlobAndRegistrationRequest =
parameterProvider.getMaxChunksInBlobAndRegistrationRequest();

for (BlobMetadata blob : blobs) {
if (chunksInCurrentBatch + blob.getChunks().size()
> parameterProvider.getMaxChunksInRegistrationRequest()) {
if (blob.getChunks().size() > maxChunksInBlobAndRegistrationRequest) {
throw new SFException(
ErrorCode.INTERNAL_ERROR,
String.format(
"Incorrectly generated blob detected - number of chunks in the blob is larger than"
+ " the max allowed number of chunks. Please report this bug to Snowflake."
+ " bdec=%s chunkCount=%d maxAllowedChunkCount=%d",
blob.getPath(), blob.getChunks().size(), maxChunksInBlobAndRegistrationRequest));
}

if (chunksInCurrentBatch + blob.getChunks().size() > maxChunksInBlobAndRegistrationRequest) {
// Newly added BDEC file would exceed the max number of chunks in a single registration
// request. We put chunks collected so far into the result list and create a new batch with
// the current blob
Expand Down
44 changes: 10 additions & 34 deletions src/main/java/net/snowflake/ingest/utils/ParameterProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,8 @@ public class ParameterProvider {
public static final String MAX_CHUNK_SIZE_IN_BYTES = "MAX_CHUNK_SIZE_IN_BYTES".toLowerCase();
public static final String MAX_ALLOWED_ROW_SIZE_IN_BYTES =
"MAX_ALLOWED_ROW_SIZE_IN_BYTES".toLowerCase();
public static final String MAX_CHUNKS_IN_BLOB = "MAX_CHUNKS_IN_BLOB".toLowerCase();
public static final String MAX_CHUNKS_IN_REGISTRATION_REQUEST =
"MAX_CHUNKS_IN_REGISTRATION_REQUEST".toLowerCase();
public static final String MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST =
"MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST".toLowerCase();

public static final String MAX_CLIENT_LAG = "MAX_CLIENT_LAG".toLowerCase();

Expand Down Expand Up @@ -62,8 +61,7 @@ public class ParameterProvider {

static final long MAX_CLIENT_LAG_MS_MAX = TimeUnit.MINUTES.toMillis(10);
public static final long MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT = 64 * 1024 * 1024; // 64 MB
public static final int MAX_CHUNKS_IN_BLOB_DEFAULT = 20;
public static final int MAX_CHUNKS_IN_REGISTRATION_REQUEST_DEFAULT = 100;
public static final int MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT = 100;

/* Parameter that enables using internal Parquet buffers for buffering of rows before serializing.
It reduces memory consumption compared to using Java Objects for buffering.*/
Expand All @@ -84,7 +82,6 @@ public class ParameterProvider {
*/
public ParameterProvider(Map<String, Object> parameterOverrides, Properties props) {
this.setParameterMap(parameterOverrides, props);
this.validateParameters();
}

/** Empty constructor for tests */
Expand Down Expand Up @@ -176,10 +173,9 @@ private void setParameterMap(Map<String, Object> parameterOverrides, Properties
this.updateValue(MAX_CLIENT_LAG, MAX_CLIENT_LAG_DEFAULT, parameterOverrides, props);
this.updateValue(
MAX_CLIENT_LAG_ENABLED, MAX_CLIENT_LAG_ENABLED_DEFAULT, parameterOverrides, props);
this.updateValue(MAX_CHUNKS_IN_BLOB, MAX_CHUNKS_IN_BLOB_DEFAULT, parameterOverrides, props);
this.updateValue(
MAX_CHUNKS_IN_REGISTRATION_REQUEST,
MAX_CHUNKS_IN_REGISTRATION_REQUEST_DEFAULT,
MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST,
MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT,
parameterOverrides,
props);
}
Expand Down Expand Up @@ -389,38 +385,18 @@ public long getMaxAllowedRowSizeInBytes() {
return (val instanceof String) ? Long.parseLong(val.toString()) : (long) val;
}

/** @return The max number of chunks that can be put into a single BDEC file */
public int getMaxChunksInBlob() {
Object val = this.parameterMap.getOrDefault(MAX_CHUNKS_IN_BLOB, MAX_CHUNKS_IN_BLOB_DEFAULT);
return (val instanceof String) ? Integer.parseInt(val.toString()) : (int) val;
}

/**
* @return The max number of chunks that can be put into a single BDEC registration request. Must
* be higher than MAX_CHUNKS_IN_BLOB.
* @return The max number of chunks that can be put into a single BDEC or blob registration
* request.
*/
public int getMaxChunksInRegistrationRequest() {
public int getMaxChunksInBlobAndRegistrationRequest() {
Object val =
this.parameterMap.getOrDefault(
MAX_CHUNKS_IN_REGISTRATION_REQUEST, MAX_CHUNKS_IN_REGISTRATION_REQUEST_DEFAULT);
MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST,
MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT);
return (val instanceof String) ? Integer.parseInt(val.toString()) : (int) val;
}

/** Validates parameters */
private void validateParameters() {
if (this.getMaxChunksInBlob() >= this.getMaxChunksInRegistrationRequest()) {
throw new SFException(
ErrorCode.INVALID_CONFIG_PARAMETER,
String.format(
"Value of configuration property %s (%d) must be smaller than the value of"
+ " configuration property %s (%d).",
MAX_CHUNKS_IN_BLOB,
getMaxChunksInBlob(),
MAX_CHUNKS_IN_REGISTRATION_REQUEST,
getMaxChunksInRegistrationRequest()));
}
}

@Override
public String toString() {
return "ParameterProvider{" + "parameterMap=" + parameterMap + '}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,7 @@ public void runTestBlobSplitDueToNumberOfChunks(int numberOfRows) throws Excepti
Math.ceil(
(double) numberOfRows
/ channelsPerTable
/ ParameterProvider.MAX_CHUNKS_IN_BLOB_DEFAULT);
/ ParameterProvider.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST_DEFAULT);

final TestContext<List<List<Object>>> testContext = testContextFactory.create();

Expand Down Expand Up @@ -612,7 +612,7 @@ public void runTestBlobSplitDueToNumberOfChunks(int numberOfRows) throws Excepti
public void testBlobSplitDueToNumberOfChunksWithLeftoverChannels() throws Exception {
final TestContext<List<List<Object>>> testContext = testContextFactory.create();

for (int i = 0; i < 19; i++) { // 19 simple chunks
for (int i = 0; i < 99; i++) { // 19 simple chunks
SnowflakeStreamingIngestChannelInternal<List<List<Object>>> channel =
addChannel(testContext, i, 1);
channel.setupSchema(Collections.singletonList(createLargeTestTextColumn("C1")));
Expand All @@ -622,19 +622,19 @@ public void testBlobSplitDueToNumberOfChunksWithLeftoverChannels() throws Except
// 20th chunk would contain multiple channels, but there are some with different encryption key
// ID, so they spill to a new blob
SnowflakeStreamingIngestChannelInternal<List<List<Object>>> channel1 =
addChannel(testContext, 19, 1);
addChannel(testContext, 99, 1);
channel1.setupSchema(Collections.singletonList(createLargeTestTextColumn("C1")));
channel1.insertRow(Collections.singletonMap("C1", 19), "");
channel1.insertRow(Collections.singletonMap("C1", 0), "");

SnowflakeStreamingIngestChannelInternal<List<List<Object>>> channel2 =
addChannel(testContext, 19, 2);
addChannel(testContext, 99, 2);
channel2.setupSchema(Collections.singletonList(createLargeTestTextColumn("C1")));
channel2.insertRow(Collections.singletonMap("C1", 19), "");
channel2.insertRow(Collections.singletonMap("C1", 0), "");

SnowflakeStreamingIngestChannelInternal<List<List<Object>>> channel3 =
addChannel(testContext, 19, 2);
addChannel(testContext, 99, 2);
channel3.setupSchema(Collections.singletonList(createLargeTestTextColumn("C1")));
channel3.insertRow(Collections.singletonMap("C1", 19), "");
channel3.insertRow(Collections.singletonMap("C1", 0), "");

FlushService<List<List<Object>>> flushService = testContext.flushService;
flushService.flush(true).get();
Expand All @@ -648,7 +648,7 @@ public void testBlobSplitDueToNumberOfChunksWithLeftoverChannels() throws Except
List<List<List<ChannelData<List<List<Object>>>>>> allUploadedBlobs =
blobDataCaptor.getAllValues();

Assert.assertEquals(22, getRows(allUploadedBlobs).size());
Assert.assertEquals(102, getRows(allUploadedBlobs).size());
}

private List<List<Object>> getRows(List<List<List<ChannelData<List<List<Object>>>>>> blobs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
*/
public class ManyTablesIT {

private static final int TABLES_COUNT = 100;
private static final int TOTAL_ROWS_COUNT = 20_000;
private static final int TABLES_COUNT = 20;
private static final int TOTAL_ROWS_COUNT = 200_000;
private String dbName;
private SnowflakeStreamingIngestClient client;
private Connection connection;
Expand All @@ -37,8 +37,7 @@ public class ManyTablesIT {
@Before
public void setUp() throws Exception {
Properties props = TestUtils.getProperties(Constants.BdecVersion.THREE, false);
props.put(ParameterProvider.MAX_CHUNKS_IN_BLOB, 2);
props.put(ParameterProvider.MAX_CHUNKS_IN_REGISTRATION_REQUEST, 3);
props.put(ParameterProvider.MAX_CHUNKS_IN_BLOB_AND_REGISTRATION_REQUEST, 2);
if (props.getProperty(ROLE).equals("DEFAULT_ROLE")) {
props.setProperty(ROLE, "ACCOUNTADMIN");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.ParameterProvider;
import net.snowflake.ingest.utils.SFException;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -279,24 +277,8 @@ public void testMaxClientLagEnabledThresholdAbove() {
public void testMaxChunksInBlobAndRegistrationRequest() {
Properties prop = new Properties();
Map<String, Object> parameterMap = getStartingParameterMap();
parameterMap.put("max_chunks_in_blob", 1);
parameterMap.put("max_chunks_in_registration_request", 2);
parameterMap.put("max_chunks_in_blob_and_registration_request", 1);
ParameterProvider parameterProvider = new ParameterProvider(parameterMap, prop);
Assert.assertEquals(1, parameterProvider.getMaxChunksInBlob());
Assert.assertEquals(2, parameterProvider.getMaxChunksInRegistrationRequest());
}

@Test
public void testValidationMaxChunksInBlobAndRegistrationRequest() {
Properties prop = new Properties();
Map<String, Object> parameterMap = getStartingParameterMap();
parameterMap.put("max_chunks_in_blob", 2);
parameterMap.put("max_chunks_in_registration_request", 1);
try {
new ParameterProvider(parameterMap, prop);
Assert.fail("Should not have succeeded");
} catch (SFException e) {
Assert.assertEquals(ErrorCode.INVALID_CONFIG_PARAMETER.getMessageCode(), e.getVendorCode());
}
Assert.assertEquals(1, parameterProvider.getMaxChunksInBlobAndRegistrationRequest());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -883,14 +883,23 @@ public void testRegisterBlobChunkLimit() throws Exception {
null));

assertEquals(0, client.partitionBlobListForRegistrationRequest(new ArrayList<>()).size());
assertEquals(
1, client.partitionBlobListForRegistrationRequest(createTestBlobMetadata(1)).size());
assertEquals(
1, client.partitionBlobListForRegistrationRequest(createTestBlobMetadata(99)).size());
assertEquals(
1, client.partitionBlobListForRegistrationRequest(createTestBlobMetadata(100)).size());

assertEquals(
2, client.partitionBlobListForRegistrationRequest(createTestBlobMetadata(101)).size());
1, client.partitionBlobListForRegistrationRequest(createTestBlobMetadata(3, 95, 2)).size());
assertEquals(
2,
client.partitionBlobListForRegistrationRequest(createTestBlobMetadata(3, 95, 2, 1)).size());
assertEquals(
2, client.partitionBlobListForRegistrationRequest(createTestBlobMetadata(200)).size());
3,
client
.partitionBlobListForRegistrationRequest(createTestBlobMetadata(3, 95, 2, 1, 100))
.size());
assertEquals(
2, client.partitionBlobListForRegistrationRequest(createTestBlobMetadata(99, 2)).size());
assertEquals(
Expand Down

0 comments on commit 7166bde

Please sign in to comment.