From bb014dcbb42caf8051fd746908b9f2a4aa8e9aa2 Mon Sep 17 00:00:00 2001 From: Toby Zhang Date: Wed, 4 Oct 2023 05:49:31 +0000 Subject: [PATCH] save progress --- .../ingest/streaming/OpenChannelRequest.java | 3 +- .../streaming/internal/AbstractRowBuffer.java | 46 ++++++++++++++++--- 2 files changed, 41 insertions(+), 8 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/OpenChannelRequest.java b/src/main/java/net/snowflake/ingest/streaming/OpenChannelRequest.java index 0e8487b1c..05bde361b 100644 --- a/src/main/java/net/snowflake/ingest/streaming/OpenChannelRequest.java +++ b/src/main/java/net/snowflake/ingest/streaming/OpenChannelRequest.java @@ -12,8 +12,7 @@ public class OpenChannelRequest { public enum OnErrorOption { CONTINUE, // CONTINUE loading the rows, and return all the errors in the response ABORT, // ABORT the entire batch, and throw an exception when we hit the first error - SKIP_AT_FIRST_ERROR, // Skip the rows after the first error, and return the first - // error in the response + SKIP_BATCH, // Skip the batch after the first error, and return all the errors in the response } /** diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java index 840284406..ff56bb227 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java @@ -305,8 +305,7 @@ public InsertValidationResponse insertRows( this.flushLock.lock(); try { this.channelState.updateInsertStats(System.currentTimeMillis(), this.bufferedRowCount); - if (onErrorOption == OpenChannelRequest.OnErrorOption.CONTINUE - || onErrorOption == OpenChannelRequest.OnErrorOption.SKIP_AT_FIRST_ERROR) { + if (onErrorOption == OpenChannelRequest.OnErrorOption.CONTINUE) { // Used to map incoming row(nth row) to InsertError(for nth row) in response int rowIndex = 0; for (Map row : rows) { @@ -327,10 +326,6 @@ public InsertValidationResponse insertRows( } checkBatchSizeEnforcedMaximum(rowsSizeInBytes); rowIndex++; - if (onErrorOption == OpenChannelRequest.OnErrorOption.SKIP_AT_FIRST_ERROR - && response.hasErrors()) { - break; - } if (this.bufferedRowCount == Integer.MAX_VALUE) { throw new SFException(ErrorCode.INTERNAL_ERROR, "Row count reaches MAX value"); } @@ -351,6 +346,45 @@ public InsertValidationResponse insertRows( moveTempRowsToActualBuffer(tempRowCount); + rowsSizeInBytes = tempRowsSizeInBytes; + if ((long) this.bufferedRowCount + tempRowCount >= Integer.MAX_VALUE) { + throw new SFException(ErrorCode.INTERNAL_ERROR, "Row count reaches MAX value"); + } + this.bufferedRowCount += tempRowCount; + this.statsMap.forEach( + (colName, stats) -> + this.statsMap.put( + colName, + RowBufferStats.getCombinedStats(stats, this.tempStatsMap.get(colName)))); + } else if (onErrorOption == OpenChannelRequest.OnErrorOption.SKIP_BATCH) { + float tempRowsSizeInBytes = 0F; + int tempRowCount = 0; + for (Map row : rows) { + InsertValidationResponse.InsertError error = + new InsertValidationResponse.InsertError(row, tempRowCount); + try { + Set inputColumnNames = verifyInputColumns(row, error, tempRowCount); + tempRowsSizeInBytes += + addTempRow(row, tempRowCount, this.tempStatsMap, inputColumnNames, tempRowCount); + tempRowCount++; + } catch (SFException e) { + error.setException(e); + response.addError(error); + } catch (Throwable e) { + logger.logWarn("Unexpected error happens during insertRows: {}", e); + error.setException(new SFException(e, ErrorCode.INTERNAL_ERROR, e.getMessage())); + response.addError(error); + } + tempRowCount++; + if (this.bufferedRowCount == Integer.MAX_VALUE) { + throw new SFException(ErrorCode.INTERNAL_ERROR, "Row count reaches MAX value"); + } + } + checkBatchSizeEnforcedMaximum(tempRowsSizeInBytes); + checkBatchSizeRecommendedMaximum(tempRowsSizeInBytes); + + moveTempRowsToActualBuffer(tempRowCount); + rowsSizeInBytes = tempRowsSizeInBytes; if ((long) this.bufferedRowCount + tempRowCount >= Integer.MAX_VALUE) { throw new SFException(ErrorCode.INTERNAL_ERROR, "Row count reaches MAX value");