Skip to content

Commit

Permalink
SNOW-902709 Limit the max allowed number of chunks in blob
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-lsembera committed Sep 11, 2023
1 parent 3a3cbc8 commit df04e8c
Show file tree
Hide file tree
Showing 5 changed files with 221 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,16 @@ void distributeFlushTasks() {
if (!leftoverChannelsDataPerTable.isEmpty()) {
channelsDataPerTable.addAll(leftoverChannelsDataPerTable);
leftoverChannelsDataPerTable.clear();
} else if (blobData.size()
>= this.owningClient.getParameterProvider().getMaxChunksInBlob()) {
// Create a new blob if the current one already contains max allowed number of chunks
logger.logInfo(
"Max allowed number of chunks in the current blob reached. chunkCount={}"
+ " maxChunkCount={} currentBlobPath={}",
blobData.size(),
this.owningClient.getParameterProvider().getMaxChunksInBlob(),
blobPath);
break;
} else {
ConcurrentHashMap<String, SnowflakeStreamingIngestChannelInternal<T>> table =
itr.next().getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,42 @@ ChannelsStatusResponse getChannelsStatus(
* @param blobs list of uploaded blobs
*/
void registerBlobs(List<BlobMetadata> blobs) {
this.registerBlobs(blobs, 0);
for (List<BlobMetadata> blobBatch : partitionBlobListForRegistrationRequest(blobs)) {
this.registerBlobs(blobBatch, 0);
}
}

/**
* Partition the collection of blobs into sub-lists, so that the total number of chunks in each
* sublist does not exceed the max allowed number of chunks in one registration request.
*/
List<List<BlobMetadata>> partitionBlobListForRegistrationRequest(List<BlobMetadata> blobs) {
List<List<BlobMetadata>> result = new ArrayList<>();
List<BlobMetadata> currentBatch = new ArrayList<>();
int chunksInCurrentBatch = 0;

for (BlobMetadata blob : blobs) {
if (chunksInCurrentBatch + blob.getChunks().size()
> parameterProvider.getMaxChunksInRegistrationRequest()) {
// Newly added BDEC file would exceed the max number of chunks in a single registration
// request. We put chunks collected so far into the result list and create a new batch with
// the current blob
result.add(currentBatch);
currentBatch = new ArrayList<>();
currentBatch.add(blob);
chunksInCurrentBatch = blob.getChunks().size();
} else {
// Newly added BDEC can be added to the current batch because it does not exceed the max
// number of chunks in a single registration request, yet.
currentBatch.add(blob);
chunksInCurrentBatch += blob.getChunks().size();
}
}

if (!currentBatch.isEmpty()) {
result.add(currentBatch);
}
return result;
}

/**
Expand Down
23 changes: 23 additions & 0 deletions src/main/java/net/snowflake/ingest/utils/ParameterProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ public class ParameterProvider {
public static final String MAX_CHUNK_SIZE_IN_BYTES = "MAX_CHUNK_SIZE_IN_BYTES".toLowerCase();
public static final String MAX_ALLOWED_ROW_SIZE_IN_BYTES =
"MAX_ALLOWED_ROW_SIZE_IN_BYTES".toLowerCase();
public static final String MAX_CHUNKS_IN_BLOB = "MAX_CHUNKS_IN_BDEC".toLowerCase();
public static final String MAX_CHUNKS_IN_REGISTRATION_REQUEST =
"MAX_CHUNKS_IN_REGISTRATION_REQUEST".toLowerCase();

// Default values
public static final long BUFFER_FLUSH_INTERVAL_IN_MILLIS_DEFAULT = 1000;
Expand All @@ -46,6 +49,8 @@ public class ParameterProvider {
public static final long MAX_CHANNEL_SIZE_IN_BYTES_DEFAULT = 32000000L;
public static final long MAX_CHUNK_SIZE_IN_BYTES_DEFAULT = 128000000L;
public static final long MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT = 64 * 1024 * 1024; // 64 MB
public static final int MAX_CHUNKS_IN_BLOB_DEFAULT = 20;
public static final int MAX_CHUNKS_IN_REGISTRATION_REQUEST_DEFAULT = 100;

/* Parameter that enables using internal Parquet buffers for buffering of rows before serializing.
It reduces memory consumption compared to using Java Objects for buffering.*/
Expand Down Expand Up @@ -289,13 +294,31 @@ public long getMaxChunkSizeInBytes() {
return (val instanceof String) ? Long.parseLong(val.toString()) : (long) val;
}

/** @return The max allow row size (in bytes) */
public long getMaxAllowedRowSizeInBytes() {
Object val =
this.parameterMap.getOrDefault(
MAX_ALLOWED_ROW_SIZE_IN_BYTES, MAX_ALLOWED_ROW_SIZE_IN_BYTES_DEFAULT);
return (val instanceof String) ? Long.parseLong(val.toString()) : (long) val;
}

/** @return The max number of chunks that can be put into a single BDEC file */
public int getMaxChunksInBlob() {
Object val = this.parameterMap.getOrDefault(MAX_CHUNKS_IN_BLOB, MAX_CHUNKS_IN_BLOB_DEFAULT);
return (val instanceof String) ? Integer.parseInt(val.toString()) : (int) val;
}

/**
* @return The max number of chunks that can be put into a single BDEC registration request. Must
* be higher than MAX_CHUNKS_IN_BDEC.
*/
public int getMaxChunksInRegistrationRequest() {
Object val =
this.parameterMap.getOrDefault(
MAX_CHUNKS_IN_REGISTRATION_REQUEST, MAX_CHUNKS_IN_REGISTRATION_REQUEST_DEFAULT);
return (val instanceof String) ? Integer.parseInt(val.toString()) : (int) val;
}

@Override
public String toString() {
return "ParameterProvider{" + "parameterMap=" + parameterMap + '}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import javax.crypto.BadPaddingException;
import javax.crypto.IllegalBlockSizeException;
Expand Down Expand Up @@ -282,6 +283,21 @@ public void setup() {
java.security.Security.addProvider(new BouncyCastleProvider());
}

private SnowflakeStreamingIngestChannelInternal<?> addChannel(
TestContext<?> testContext, int tableId) {
return testContext
.channelBuilder("channel" + UUID.randomUUID())
.setDBName("db1")
.setSchemaName("PUBLIC")
.setTableName("table" + tableId)
.setOffsetToken("offset1")
.setChannelSequencer(0L)
.setRowSequencer(0L)
.setEncryptionKey("key")
.setEncryptionKeyId(1L)
.buildAndAdd();
}

private SnowflakeStreamingIngestChannelInternal<?> addChannel1(TestContext<?> testContext) {
return testContext
.channelBuilder("channel1")
Expand Down Expand Up @@ -553,6 +569,21 @@ public void testBlobSplitDueToChunkSizeLimit() throws Exception {
Mockito.verify(flushService, Mockito.times(2)).buildAndUpload(Mockito.any(), Mockito.any());
}

@Test
public void testBlobSplitDueToNumberOfChunks() throws Exception {
TestContext<?> testContext = testContextFactory.create();

for (int i = 0; i < 1111; i++) {
SnowflakeStreamingIngestChannelInternal<?> channel = addChannel(testContext, i / 3);
channel.setupSchema(Collections.singletonList(createLargeTestTextColumn("C1")));
channel.insertRow(Collections.singletonMap("C1", i), "");
}

FlushService<?> flushService = testContext.flushService;
flushService.flush(true).get();
Mockito.verify(flushService, Mockito.times(19)).buildAndUpload(Mockito.any(), Mockito.any());
}

@Test
public void testBuildAndUpload() throws Exception {
long expectedBuildLatencyMs = 100;
Expand Down
Loading

0 comments on commit df04e8c

Please sign in to comment.