Skip to content

Commit

Permalink
save progress
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-tzhang committed Oct 4, 2023
1 parent 654afeb commit bb014dc
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ public class OpenChannelRequest {
public enum OnErrorOption {
CONTINUE, // CONTINUE loading the rows, and return all the errors in the response
ABORT, // ABORT the entire batch, and throw an exception when we hit the first error
SKIP_AT_FIRST_ERROR, // Skip the rows after the first error, and return the first
// error in the response
SKIP_BATCH, // Skip the batch after the first error, and return all the errors in the response
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,8 +305,7 @@ public InsertValidationResponse insertRows(
this.flushLock.lock();
try {
this.channelState.updateInsertStats(System.currentTimeMillis(), this.bufferedRowCount);
if (onErrorOption == OpenChannelRequest.OnErrorOption.CONTINUE
|| onErrorOption == OpenChannelRequest.OnErrorOption.SKIP_AT_FIRST_ERROR) {
if (onErrorOption == OpenChannelRequest.OnErrorOption.CONTINUE) {
// Used to map incoming row(nth row) to InsertError(for nth row) in response
int rowIndex = 0;
for (Map<String, Object> row : rows) {
Expand All @@ -327,10 +326,6 @@ public InsertValidationResponse insertRows(
}
checkBatchSizeEnforcedMaximum(rowsSizeInBytes);
rowIndex++;
if (onErrorOption == OpenChannelRequest.OnErrorOption.SKIP_AT_FIRST_ERROR
&& response.hasErrors()) {
break;
}
if (this.bufferedRowCount == Integer.MAX_VALUE) {
throw new SFException(ErrorCode.INTERNAL_ERROR, "Row count reaches MAX value");
}
Expand All @@ -351,6 +346,45 @@ public InsertValidationResponse insertRows(

moveTempRowsToActualBuffer(tempRowCount);

rowsSizeInBytes = tempRowsSizeInBytes;
if ((long) this.bufferedRowCount + tempRowCount >= Integer.MAX_VALUE) {
throw new SFException(ErrorCode.INTERNAL_ERROR, "Row count reaches MAX value");
}
this.bufferedRowCount += tempRowCount;
this.statsMap.forEach(
(colName, stats) ->
this.statsMap.put(
colName,
RowBufferStats.getCombinedStats(stats, this.tempStatsMap.get(colName))));
} else if (onErrorOption == OpenChannelRequest.OnErrorOption.SKIP_BATCH) {
float tempRowsSizeInBytes = 0F;
int tempRowCount = 0;
for (Map<String, Object> row : rows) {
InsertValidationResponse.InsertError error =
new InsertValidationResponse.InsertError(row, tempRowCount);
try {
Set<String> inputColumnNames = verifyInputColumns(row, error, tempRowCount);
tempRowsSizeInBytes +=
addTempRow(row, tempRowCount, this.tempStatsMap, inputColumnNames, tempRowCount);
tempRowCount++;
} catch (SFException e) {
error.setException(e);
response.addError(error);
} catch (Throwable e) {
logger.logWarn("Unexpected error happens during insertRows: {}", e);
error.setException(new SFException(e, ErrorCode.INTERNAL_ERROR, e.getMessage()));
response.addError(error);
}
tempRowCount++;
if (this.bufferedRowCount == Integer.MAX_VALUE) {
throw new SFException(ErrorCode.INTERNAL_ERROR, "Row count reaches MAX value");
}
}
checkBatchSizeEnforcedMaximum(tempRowsSizeInBytes);
checkBatchSizeRecommendedMaximum(tempRowsSizeInBytes);

moveTempRowsToActualBuffer(tempRowCount);

rowsSizeInBytes = tempRowsSizeInBytes;
if ((long) this.bufferedRowCount + tempRowCount >= Integer.MAX_VALUE) {
throw new SFException(ErrorCode.INTERNAL_ERROR, "Row count reaches MAX value");
Expand Down

0 comments on commit bb014dc

Please sign in to comment.