Skip to content

Commit

Permalink
NO-SNOW: Fix two issues in insertRows API (#602)
Browse files Browse the repository at this point in the history
This PR contains two fixes in the current insertRows logic:

- We can't thrown any exception when ON_ERROR=CONTINUE, since it will cause duplicate data if we insert rows into the buffer but not updating the offset token
- Update the row buffer only if the rows are inserted successfully for ON_ERROR=SKIP_BATCH
  • Loading branch information
sfc-gh-tzhang authored Oct 17, 2023
1 parent 5baaf19 commit 570894c
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ public InsertValidationResponse insertRows(
InsertValidationResponse.InsertError error =
new InsertValidationResponse.InsertError(row, rowIndex);
try {
if (rowBuffer.bufferedRowCount == Integer.MAX_VALUE) {
throw new SFException(ErrorCode.INTERNAL_ERROR, "Row count reaches MAX value");
}
Set<String> inputColumnNames = verifyInputColumns(row, error, rowIndex);
rowsSizeInBytes +=
addRow(
Expand All @@ -163,11 +166,7 @@ public InsertValidationResponse insertRows(
error.setException(new SFException(e, ErrorCode.INTERNAL_ERROR, e.getMessage()));
response.addError(error);
}
checkBatchSizeEnforcedMaximum(rowsSizeInBytes);
rowIndex++;
if (rowBuffer.bufferedRowCount == Integer.MAX_VALUE) {
throw new SFException(ErrorCode.INTERNAL_ERROR, "Row count reaches MAX value");
}
}
checkBatchSizeRecommendedMaximum(rowsSizeInBytes);
rowBuffer.channelState.setOffsetToken(offsetToken);
Expand All @@ -191,17 +190,17 @@ public InsertValidationResponse insertRows(
Set<String> inputColumnNames = verifyInputColumns(row, null, tempRowCount);
tempRowsSizeInBytes +=
addTempRow(row, tempRowCount, rowBuffer.tempStatsMap, inputColumnNames, tempRowCount);
checkBatchSizeEnforcedMaximum(tempRowsSizeInBytes);
tempRowCount++;
checkBatchSizeEnforcedMaximum(tempRowsSizeInBytes);
if ((long) rowBuffer.bufferedRowCount + tempRowCount >= Integer.MAX_VALUE) {
throw new SFException(ErrorCode.INTERNAL_ERROR, "Row count reaches MAX value");
}
}
checkBatchSizeRecommendedMaximum(tempRowsSizeInBytes);

moveTempRowsToActualBuffer(tempRowCount);

rowsSizeInBytes = tempRowsSizeInBytes;
if ((long) rowBuffer.bufferedRowCount + tempRowCount >= Integer.MAX_VALUE) {
throw new SFException(ErrorCode.INTERNAL_ERROR, "Row count reaches MAX value");
}
rowBuffer.bufferedRowCount += tempRowCount;
rowBuffer.statsMap.forEach(
(colName, stats) ->
Expand Down Expand Up @@ -241,25 +240,25 @@ public InsertValidationResponse insertRows(
response.addError(error);
}
checkBatchSizeEnforcedMaximum(tempRowsSizeInBytes);
if ((long) rowBuffer.bufferedRowCount + tempRowCount >= Integer.MAX_VALUE) {
throw new SFException(ErrorCode.INTERNAL_ERROR, "Row count reaches MAX value");
}
}

if (!response.hasErrors()) {
checkBatchSizeRecommendedMaximum(tempRowsSizeInBytes);
moveTempRowsToActualBuffer(tempRowCount);
rowsSizeInBytes = tempRowsSizeInBytes;
if ((long) rowBuffer.bufferedRowCount + tempRowCount >= Integer.MAX_VALUE) {
throw new SFException(ErrorCode.INTERNAL_ERROR, "Row count reaches MAX value");
}
rowBuffer.bufferedRowCount += tempRowCount;
rowBuffer.statsMap.forEach(
(colName, stats) ->
rowBuffer.statsMap.put(
colName,
RowBufferStats.getCombinedStats(stats, rowBuffer.tempStatsMap.get(colName))));
rowBuffer.channelState.setOffsetToken(offsetToken);
rowBuffer.bufferSize += rowsSizeInBytes;
rowBuffer.rowSizeMetric.accept(rowsSizeInBytes);
}
rowBuffer.bufferSize += rowsSizeInBytes;
rowBuffer.rowSizeMetric.accept(rowsSizeInBytes);
return response;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,6 @@ private void testE2ETimeHelper(OpenChannelRequest.OnErrorOption onErrorOption) {

@Test
public void testMaxInsertRowsBatchSize() {
testMaxInsertRowsBatchSizeHelper(OpenChannelRequest.OnErrorOption.CONTINUE);
testMaxInsertRowsBatchSizeHelper(OpenChannelRequest.OnErrorOption.ABORT);
testMaxInsertRowsBatchSizeHelper(OpenChannelRequest.OnErrorOption.SKIP_BATCH);
}
Expand Down

0 comments on commit 570894c

Please sign in to comment.