Skip to content

Commit

Permalink
save progress
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-tzhang committed Oct 4, 2023
1 parent bb014dc commit 6e20f47
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ public InsertValidationResponse insertRows(
}
}
checkBatchSizeRecommendedMaximum(rowsSizeInBytes);
this.channelState.setOffsetToken(offsetToken);
} else if (onErrorOption == OpenChannelRequest.OnErrorOption.ABORT) {
// If the on_error option is ABORT, simply throw the first exception
float tempRowsSizeInBytes = 0F;
Expand All @@ -356,6 +357,7 @@ public InsertValidationResponse insertRows(
this.statsMap.put(
colName,
RowBufferStats.getCombinedStats(stats, this.tempStatsMap.get(colName))));
this.channelState.setOffsetToken(offsetToken);
} else if (onErrorOption == OpenChannelRequest.OnErrorOption.SKIP_BATCH) {
float tempRowsSizeInBytes = 0F;
int tempRowCount = 0;
Expand All @@ -375,30 +377,26 @@ public InsertValidationResponse insertRows(
error.setException(new SFException(e, ErrorCode.INTERNAL_ERROR, e.getMessage()));
response.addError(error);
}
tempRowCount++;
if (this.bufferedRowCount == Integer.MAX_VALUE) {
throw new SFException(ErrorCode.INTERNAL_ERROR, "Row count reaches MAX value");
}
checkBatchSizeEnforcedMaximum(tempRowsSizeInBytes);
}
checkBatchSizeEnforcedMaximum(tempRowsSizeInBytes);
checkBatchSizeRecommendedMaximum(tempRowsSizeInBytes);

moveTempRowsToActualBuffer(tempRowCount);

rowsSizeInBytes = tempRowsSizeInBytes;
if ((long) this.bufferedRowCount + tempRowCount >= Integer.MAX_VALUE) {
throw new SFException(ErrorCode.INTERNAL_ERROR, "Row count reaches MAX value");
if (!response.hasErrors()) {
checkBatchSizeRecommendedMaximum(tempRowsSizeInBytes);
moveTempRowsToActualBuffer(tempRowCount);
rowsSizeInBytes = tempRowsSizeInBytes;
if ((long) this.bufferedRowCount + tempRowCount >= Integer.MAX_VALUE) {
throw new SFException(ErrorCode.INTERNAL_ERROR, "Row count reaches MAX value");
}
this.bufferedRowCount += tempRowCount;
this.statsMap.forEach(
(colName, stats) ->
this.statsMap.put(
colName,
RowBufferStats.getCombinedStats(stats, this.tempStatsMap.get(colName))));
this.channelState.setOffsetToken(offsetToken);
}
this.bufferedRowCount += tempRowCount;
this.statsMap.forEach(
(colName, stats) ->
this.statsMap.put(
colName,
RowBufferStats.getCombinedStats(stats, this.tempStatsMap.get(colName))));
}

this.bufferSize += rowsSizeInBytes;
this.channelState.setOffsetToken(offsetToken);
this.rowSizeMetric.accept(rowsSizeInBytes);
} finally {
this.tempStatsMap.values().forEach(RowBufferStats::reset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ public void setupSchema(List<ColumnMetadata> columns) {
this.statsMap.put(
column.getInternalName(), new RowBufferStats(column.getName(), column.getCollation()));

if (onErrorOption == OpenChannelRequest.OnErrorOption.ABORT) {
if (onErrorOption == OpenChannelRequest.OnErrorOption.ABORT
|| onErrorOption == OpenChannelRequest.OnErrorOption.SKIP_BATCH) {
this.tempStatsMap.put(
column.getInternalName(), new RowBufferStats(column.getName(), column.getCollation()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class RowBufferTest {
private final boolean enableParquetMemoryOptimization;
private AbstractRowBuffer<?> rowBufferOnErrorContinue;
private AbstractRowBuffer<?> rowBufferOnErrorAbort;
private AbstractRowBuffer<?> rowBufferOnErrorSkipBatch;

public RowBufferTest() {
this.enableParquetMemoryOptimization = false;
Expand All @@ -39,9 +40,11 @@ public RowBufferTest() {
public void setupRowBuffer() {
this.rowBufferOnErrorContinue = createTestBuffer(OpenChannelRequest.OnErrorOption.CONTINUE);
this.rowBufferOnErrorAbort = createTestBuffer(OpenChannelRequest.OnErrorOption.ABORT);
this.rowBufferOnErrorSkipBatch = createTestBuffer(OpenChannelRequest.OnErrorOption.SKIP_BATCH);
List<ColumnMetadata> schema = createSchema();
this.rowBufferOnErrorContinue.setupSchema(schema);
this.rowBufferOnErrorAbort.setupSchema(schema);
this.rowBufferOnErrorSkipBatch.setupSchema(schema);
}

static List<ColumnMetadata> createSchema() {
Expand Down Expand Up @@ -467,6 +470,7 @@ private void testFlushHelper(AbstractRowBuffer<?> rowBuffer) {
public void testDoubleQuotesColumnName() {
testDoubleQuotesColumnNameHelper(OpenChannelRequest.OnErrorOption.ABORT);
testDoubleQuotesColumnNameHelper(OpenChannelRequest.OnErrorOption.CONTINUE);
testDoubleQuotesColumnNameHelper(OpenChannelRequest.OnErrorOption.SKIP_BATCH);
}

private void testDoubleQuotesColumnNameHelper(OpenChannelRequest.OnErrorOption onErrorOption) {
Expand Down Expand Up @@ -739,6 +743,7 @@ private void testStatsE2EHelper(AbstractRowBuffer<?> rowBuffer) {
public void testStatsE2ETimestamp() {
testStatsE2ETimestampHelper(OpenChannelRequest.OnErrorOption.ABORT);
testStatsE2ETimestampHelper(OpenChannelRequest.OnErrorOption.CONTINUE);
testStatsE2ETimestampHelper(OpenChannelRequest.OnErrorOption.SKIP_BATCH);
}

private void testStatsE2ETimestampHelper(OpenChannelRequest.OnErrorOption onErrorOption) {
Expand Down Expand Up @@ -820,6 +825,7 @@ private void testStatsE2ETimestampHelper(OpenChannelRequest.OnErrorOption onErro
public void testE2EDate() {
testE2EDateHelper(OpenChannelRequest.OnErrorOption.ABORT);
testE2EDateHelper(OpenChannelRequest.OnErrorOption.CONTINUE);
testE2EDateHelper(OpenChannelRequest.OnErrorOption.SKIP_BATCH);
}

private void testE2EDateHelper(OpenChannelRequest.OnErrorOption onErrorOption) {
Expand Down Expand Up @@ -868,6 +874,7 @@ private void testE2EDateHelper(OpenChannelRequest.OnErrorOption onErrorOption) {
public void testE2ETime() {
testE2ETimeHelper(OpenChannelRequest.OnErrorOption.ABORT);
testE2ETimeHelper(OpenChannelRequest.OnErrorOption.CONTINUE);
testE2ETimeHelper(OpenChannelRequest.OnErrorOption.SKIP_BATCH);
}

private void testE2ETimeHelper(OpenChannelRequest.OnErrorOption onErrorOption) {
Expand Down Expand Up @@ -940,6 +947,7 @@ private void testE2ETimeHelper(OpenChannelRequest.OnErrorOption onErrorOption) {
public void testMaxInsertRowsBatchSize() {
testMaxInsertRowsBatchSizeHelper(OpenChannelRequest.OnErrorOption.CONTINUE);
testMaxInsertRowsBatchSizeHelper(OpenChannelRequest.OnErrorOption.ABORT);
testMaxInsertRowsBatchSizeHelper(OpenChannelRequest.OnErrorOption.SKIP_BATCH);
}

private void testMaxInsertRowsBatchSizeHelper(OpenChannelRequest.OnErrorOption onErrorOption) {
Expand Down Expand Up @@ -977,6 +985,7 @@ private void testMaxInsertRowsBatchSizeHelper(OpenChannelRequest.OnErrorOption o
public void testNullableCheck() {
testNullableCheckHelper(OpenChannelRequest.OnErrorOption.ABORT);
testNullableCheckHelper(OpenChannelRequest.OnErrorOption.CONTINUE);
testNullableCheckHelper(OpenChannelRequest.OnErrorOption.SKIP_BATCH);
}

private void testNullableCheckHelper(OpenChannelRequest.OnErrorOption onErrorOption) {
Expand Down Expand Up @@ -1016,6 +1025,7 @@ private void testNullableCheckHelper(OpenChannelRequest.OnErrorOption onErrorOpt
public void testMissingColumnCheck() {
testMissingColumnCheckHelper(OpenChannelRequest.OnErrorOption.ABORT);
testMissingColumnCheckHelper(OpenChannelRequest.OnErrorOption.CONTINUE);
testMissingColumnCheckHelper(OpenChannelRequest.OnErrorOption.SKIP_BATCH);
}

private void testMissingColumnCheckHelper(OpenChannelRequest.OnErrorOption onErrorOption) {
Expand Down Expand Up @@ -1087,13 +1097,10 @@ public void testExtraColumnsCheck() {
}

@Test
public void testFailureHalfwayThroughColumnProcessingAbort() {
doTestFailureHalfwayThroughColumnProcessing(OpenChannelRequest.OnErrorOption.ABORT);
}

@Test
public void testFailureHalfwayThroughColumnProcessingContinue() {
public void testFailureHalfwayThroughColumnProcessing() {
doTestFailureHalfwayThroughColumnProcessing(OpenChannelRequest.OnErrorOption.CONTINUE);
doTestFailureHalfwayThroughColumnProcessing(OpenChannelRequest.OnErrorOption.ABORT);
doTestFailureHalfwayThroughColumnProcessing(OpenChannelRequest.OnErrorOption.SKIP_BATCH);
}

private void doTestFailureHalfwayThroughColumnProcessing(
Expand Down Expand Up @@ -1170,6 +1177,7 @@ private void doTestFailureHalfwayThroughColumnProcessing(
public void testE2EBoolean() {
testE2EBooleanHelper(OpenChannelRequest.OnErrorOption.ABORT);
testE2EBooleanHelper(OpenChannelRequest.OnErrorOption.CONTINUE);
testE2EBooleanHelper(OpenChannelRequest.OnErrorOption.SKIP_BATCH);
}

private void testE2EBooleanHelper(OpenChannelRequest.OnErrorOption onErrorOption) {
Expand Down Expand Up @@ -1218,6 +1226,7 @@ private void testE2EBooleanHelper(OpenChannelRequest.OnErrorOption onErrorOption
public void testE2EBinary() {
testE2EBinaryHelper(OpenChannelRequest.OnErrorOption.ABORT);
testE2EBinaryHelper(OpenChannelRequest.OnErrorOption.CONTINUE);
testE2EBinaryHelper(OpenChannelRequest.OnErrorOption.SKIP_BATCH);
}

private void testE2EBinaryHelper(OpenChannelRequest.OnErrorOption onErrorOption) {
Expand Down Expand Up @@ -1274,6 +1283,7 @@ private void testE2EBinaryHelper(OpenChannelRequest.OnErrorOption onErrorOption)
public void testE2EReal() {
testE2ERealHelper(OpenChannelRequest.OnErrorOption.ABORT);
testE2ERealHelper(OpenChannelRequest.OnErrorOption.CONTINUE);
testE2ERealHelper(OpenChannelRequest.OnErrorOption.SKIP_BATCH);
}

private void testE2ERealHelper(OpenChannelRequest.OnErrorOption onErrorOption) {
Expand Down Expand Up @@ -1393,10 +1403,87 @@ public void testOnErrorAbortFailures() {
Assert.assertEquals(0, innerBuffer.bufferedRowCount);
}

@Test
public void testOnErrorAbortSkipBatch() {
AbstractRowBuffer<?> innerBuffer = createTestBuffer(OpenChannelRequest.OnErrorOption.ABORT);

ColumnMetadata colDecimal = new ColumnMetadata();
colDecimal.setName("COLDECIMAL");
colDecimal.setPhysicalType("SB16");
colDecimal.setNullable(true);
colDecimal.setLogicalType("FIXED");
colDecimal.setPrecision(38);
colDecimal.setScale(0);

innerBuffer.setupSchema(Collections.singletonList(colDecimal));
Map<String, Object> row = new HashMap<>();
row.put("COLDECIMAL", 1);

InsertValidationResponse response = innerBuffer.insertRows(Collections.singletonList(row), "1");
Assert.assertFalse(response.hasErrors());

Assert.assertEquals(1, innerBuffer.bufferedRowCount);
Assert.assertEquals(0, innerBuffer.getTempRowCount());
Assert.assertEquals(
1, innerBuffer.statsMap.get("COLDECIMAL").getCurrentMaxIntValue().intValue());
Assert.assertEquals(
1, innerBuffer.statsMap.get("COLDECIMAL").getCurrentMinIntValue().intValue());
Assert.assertNull(innerBuffer.tempStatsMap.get("COLDECIMAL").getCurrentMaxIntValue());
Assert.assertNull(innerBuffer.tempStatsMap.get("COLDECIMAL").getCurrentMinIntValue());

Map<String, Object> row2 = new HashMap<>();
row2.put("COLDECIMAL", 2);
response = innerBuffer.insertRows(Collections.singletonList(row2), "2");
Assert.assertFalse(response.hasErrors());

Assert.assertEquals(2, innerBuffer.bufferedRowCount);
Assert.assertEquals(0, innerBuffer.getTempRowCount());
Assert.assertEquals(
2, innerBuffer.statsMap.get("COLDECIMAL").getCurrentMaxIntValue().intValue());
Assert.assertEquals(
1, innerBuffer.statsMap.get("COLDECIMAL").getCurrentMinIntValue().intValue());
Assert.assertNull(innerBuffer.tempStatsMap.get("COLDECIMAL").getCurrentMaxIntValue());
Assert.assertNull(innerBuffer.tempStatsMap.get("COLDECIMAL").getCurrentMinIntValue());

Map<String, Object> row3 = new HashMap<>();
row3.put("COLDECIMAL", true);
try {
innerBuffer.insertRows(Collections.singletonList(row3), "3");
} catch (SFException e) {
Assert.assertEquals(ErrorCode.INVALID_FORMAT_ROW.getMessageCode(), e.getVendorCode());
}

Assert.assertEquals(2, innerBuffer.bufferedRowCount);
Assert.assertEquals(0, innerBuffer.getTempRowCount());
Assert.assertEquals(
2, innerBuffer.statsMap.get("COLDECIMAL").getCurrentMaxIntValue().intValue());
Assert.assertEquals(
1, innerBuffer.statsMap.get("COLDECIMAL").getCurrentMinIntValue().intValue());
Assert.assertNull(innerBuffer.tempStatsMap.get("COLDECIMAL").getCurrentMaxIntValue());
Assert.assertNull(innerBuffer.tempStatsMap.get("COLDECIMAL").getCurrentMinIntValue());

row3.put("COLDECIMAL", 3);
response = innerBuffer.insertRows(Collections.singletonList(row3), "3");
Assert.assertFalse(response.hasErrors());
Assert.assertEquals(3, innerBuffer.bufferedRowCount);
Assert.assertEquals(0, innerBuffer.getTempRowCount());
Assert.assertEquals(
3, innerBuffer.statsMap.get("COLDECIMAL").getCurrentMaxIntValue().intValue());
Assert.assertEquals(
1, innerBuffer.statsMap.get("COLDECIMAL").getCurrentMinIntValue().intValue());
Assert.assertNull(innerBuffer.tempStatsMap.get("COLDECIMAL").getCurrentMaxIntValue());
Assert.assertNull(innerBuffer.tempStatsMap.get("COLDECIMAL").getCurrentMinIntValue());

ChannelData<?> data = innerBuffer.flush("my_snowpipe_streaming.bdec");
Assert.assertEquals(3, data.getRowCount());
Assert.assertEquals(0, innerBuffer.bufferedRowCount);
}

@Test
public void testE2EVariant() {
testE2EVariantHelper(OpenChannelRequest.OnErrorOption.ABORT);
testE2EVariantHelper(OpenChannelRequest.OnErrorOption.CONTINUE);
testE2EVariantHelper(OpenChannelRequest.OnErrorOption.SKIP_BATCH);
}

private void testE2EVariantHelper(OpenChannelRequest.OnErrorOption onErrorOption) {
Expand Down Expand Up @@ -1446,6 +1533,7 @@ private void testE2EVariantHelper(OpenChannelRequest.OnErrorOption onErrorOption
public void testE2EObject() {
testE2EObjectHelper(OpenChannelRequest.OnErrorOption.ABORT);
testE2EObjectHelper(OpenChannelRequest.OnErrorOption.CONTINUE);
testE2EObjectHelper(OpenChannelRequest.OnErrorOption.SKIP_BATCH);
}

private void testE2EObjectHelper(OpenChannelRequest.OnErrorOption onErrorOption) {
Expand Down Expand Up @@ -1477,6 +1565,7 @@ private void testE2EObjectHelper(OpenChannelRequest.OnErrorOption onErrorOption)
public void testE2EArray() {
testE2EArrayHelper(OpenChannelRequest.OnErrorOption.ABORT);
testE2EArrayHelper(OpenChannelRequest.OnErrorOption.CONTINUE);
testE2EArrayHelper(OpenChannelRequest.OnErrorOption.SKIP_BATCH);
}

private void testE2EArrayHelper(OpenChannelRequest.OnErrorOption onErrorOption) {
Expand Down Expand Up @@ -1527,6 +1616,8 @@ public void testOnErrorAbortRowsWithError() {
createTestBuffer(OpenChannelRequest.OnErrorOption.CONTINUE);
AbstractRowBuffer<?> innerBufferOnErrorAbort =
createTestBuffer(OpenChannelRequest.OnErrorOption.ABORT);
AbstractRowBuffer<?> innerBufferOnErrorSkipBatch =
createTestBuffer(OpenChannelRequest.OnErrorOption.SKIP_BATCH);

ColumnMetadata colChar = new ColumnMetadata();
colChar.setName("COLCHAR");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,30 @@ public void testInsertTooLargeRow() {
Assert.assertEquals(ErrorCode.MAX_ROW_SIZE_EXCEEDED.getMessageCode(), e.getVendorCode());
Assert.assertEquals(expectedMessage, e.getMessage());
}

// Test channel with on error SKIP_BATCH
channel =
new SnowflakeStreamingIngestChannelInternal<>(
"channel",
"db",
"schema",
"table",
"0",
0L,
0L,
client,
"key",
1234L,
OpenChannelRequest.OnErrorOption.SKIP_BATCH,
UTC);
channel.setupSchema(schema);

insertValidationResponse = channel.insertRow(row, "token-1");
Assert.assertEquals(1, insertValidationResponse.getErrorRowCount());
thrownException = insertValidationResponse.getInsertErrors().get(0).getException();
Assert.assertEquals(
ErrorCode.MAX_ROW_SIZE_EXCEEDED.getMessageCode(), thrownException.getVendorCode());
Assert.assertEquals(expectedMessage, thrownException.getMessage());
}

@Test
Expand Down

0 comments on commit 6e20f47

Please sign in to comment.