Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1258064 Remove/Relax the limitations in order to generate bigger files #730

Merged
merged 8 commits into from
Apr 3, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,6 @@ public InsertValidationResponse insertRows(
tempRowsSizeInBytes +=
addTempRow(row, tempRowCount, rowBuffer.tempStatsMap, inputColumnNames, tempRowCount);
tempRowCount++;
checkBatchSizeEnforcedMaximum(tempRowsSizeInBytes);
if ((long) rowBuffer.bufferedRowCount + tempRowCount >= Integer.MAX_VALUE) {
throw new SFException(ErrorCode.INTERNAL_ERROR, "Row count reaches MAX value");
}
Expand Down Expand Up @@ -261,7 +260,6 @@ public InsertValidationResponse insertRows(
response.addError(error);
}
rowIndex++;
checkBatchSizeEnforcedMaximum(tempRowsSizeInBytes);
if ((long) rowBuffer.bufferedRowCount + rowIndex >= Integer.MAX_VALUE) {
throw new SFException(ErrorCode.INTERNAL_ERROR, "Row count reaches MAX value");
}
Expand Down Expand Up @@ -673,15 +671,6 @@ static <T> AbstractRowBuffer<T> createRowBuffer(
}
}

private void checkBatchSizeEnforcedMaximum(float batchSizeInBytes) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this still be here but with higher limits?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. This change was added before to avoid OOM issue in the scanner, given that the Parquert scanner is much better now and the iceberg doc mentions that it works well for files upto 1GB compressed, I don't think this is necessary now
  2. We still have a warning on this, see checkBatchSizeRecommendedMaximum right below this function
  3. We have valid use cases where the batch size could be big, imagine the case where the Kafka Connector has a big flush interval and everything will be inserted as one batch

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I somehow missed this PR until it got merged. I have concerns about this change because there was another reason why these limits were introduced - to prevent long GC pauses. We observed JVM freezes for customers who were passing hundreds of thousands of rows into insertRows(), see more details here.

I would prefer to keep some limit there, customers should not pass hundreds of megabytes of data in a single batch.

cc @sfc-gh-wfateem

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sfc-gh-alhuang Do you want to run some local tests to see if things are working with a few GBs of a batch and 1 minutes of flush interval? If not, then we can add the exception back but with a much higher limit, WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got GCLocker timeout error message after building a 600+MB of blobs. I'll add the assertion back with higher limit.

if (batchSizeInBytes > clientBufferParameters.getMaxChunkSizeInBytes()) {
throw new SFException(
ErrorCode.MAX_BATCH_SIZE_EXCEEDED,
sfc-gh-alhuang marked this conversation as resolved.
Show resolved Hide resolved
clientBufferParameters.getMaxChunkSizeInBytes(),
INSERT_ROWS_RECOMMENDED_MAX_BATCH_SIZE_IN_BYTES);
}
}

private void checkBatchSizeRecommendedMaximum(float batchSizeInBytes) {
if (batchSizeInBytes > INSERT_ROWS_RECOMMENDED_MAX_BATCH_SIZE_IN_BYTES) {
logger.logWarn(
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/net/snowflake/ingest/utils/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class Constants {
7L; // Don't change, should match server side
public static final int BLOB_UPLOAD_TIMEOUT_IN_SEC = 5;
public static final int INSERT_THROTTLE_MAX_RETRY_COUNT = 60;
public static final long MAX_BLOB_SIZE_IN_BYTES = 256000000L;
public static final long MAX_BLOB_SIZE_IN_BYTES = 1024 * 1024 * 1024;
public static final int BLOB_TAG_SIZE_IN_BYTES = 4;
public static final int BLOB_VERSION_SIZE_IN_BYTES = 1;
public static final int BLOB_FILE_SIZE_SIZE_IN_BYTES = 8;
Expand Down
5 changes: 2 additions & 3 deletions src/main/java/net/snowflake/ingest/utils/ErrorCode.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@ public enum ErrorCode {
MAKE_URI_FAILURE("0032"),
OAUTH_REFRESH_TOKEN_ERROR("0033"),
INVALID_CONFIG_PARAMETER("0034"),
MAX_BATCH_SIZE_EXCEEDED("0035"),
CRYPTO_PROVIDER_ERROR("0036"),
DROP_CHANNEL_FAILURE("0037");
CRYPTO_PROVIDER_ERROR("0035"),
DROP_CHANNEL_FAILURE("0036");

public static final String errorMessageResource = "net.snowflake.ingest.ingest_error_messages";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ public class ParameterProvider {
public static final int IO_TIME_CPU_RATIO_DEFAULT = 2;
public static final int BLOB_UPLOAD_MAX_RETRY_COUNT_DEFAULT = 24;
public static final long MAX_MEMORY_LIMIT_IN_BYTES_DEFAULT = -1L;
public static final long MAX_CHANNEL_SIZE_IN_BYTES_DEFAULT = 32000000L;
public static final long MAX_CHUNK_SIZE_IN_BYTES_DEFAULT = 128000000L;
public static final long MAX_CHANNEL_SIZE_IN_BYTES_DEFAULT = 128 * 1024 * 1024;
public static final long MAX_CHUNK_SIZE_IN_BYTES_DEFAULT = 512 * 1024 * 1024;

// Lag related parameters
public static final long MAX_CLIENT_LAG_DEFAULT = 1000; // 1 second
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1021,17 +1021,8 @@ private void testMaxInsertRowsBatchSizeHelper(OpenChannelRequest.OnErrorOption o
}

// Insert rows should succeed
innerBuffer.insertRows(rows, "", "");

// After adding another row, it should fail due to too large batch of rows passed to
// insertRows() in one go
rows.add(Collections.singletonMap("COLBINARY", arr));
try {
innerBuffer.insertRows(rows, "", "");
Assert.fail("Inserting rows should have failed");
} catch (SFException e) {
Assert.assertEquals(ErrorCode.MAX_BATCH_SIZE_EXCEEDED.getMessageCode(), e.getVendorCode());
}
innerBuffer.insertRows(rows, "", "");
}

@Test
Expand Down
Loading