From 570894c9ac287abc22f8243594847f80cafa4bae Mon Sep 17 00:00:00 2001 From: Toby Zhang Date: Tue, 17 Oct 2023 12:47:55 -0700 Subject: [PATCH] NO-SNOW: Fix two issues in insertRows API (#602) This PR contains two fixes in the current insertRows logic: - We can't thrown any exception when ON_ERROR=CONTINUE, since it will cause duplicate data if we insert rows into the buffer but not updating the offset token - Update the row buffer only if the rows are inserted successfully for ON_ERROR=SKIP_BATCH --- .../streaming/internal/AbstractRowBuffer.java | 25 +++++++++---------- .../streaming/internal/RowBufferTest.java | 1 - 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java index 30be56fcb..3029e7a49 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java @@ -150,6 +150,9 @@ public InsertValidationResponse insertRows( InsertValidationResponse.InsertError error = new InsertValidationResponse.InsertError(row, rowIndex); try { + if (rowBuffer.bufferedRowCount == Integer.MAX_VALUE) { + throw new SFException(ErrorCode.INTERNAL_ERROR, "Row count reaches MAX value"); + } Set inputColumnNames = verifyInputColumns(row, error, rowIndex); rowsSizeInBytes += addRow( @@ -163,11 +166,7 @@ public InsertValidationResponse insertRows( error.setException(new SFException(e, ErrorCode.INTERNAL_ERROR, e.getMessage())); response.addError(error); } - checkBatchSizeEnforcedMaximum(rowsSizeInBytes); rowIndex++; - if (rowBuffer.bufferedRowCount == Integer.MAX_VALUE) { - throw new SFException(ErrorCode.INTERNAL_ERROR, "Row count reaches MAX value"); - } } checkBatchSizeRecommendedMaximum(rowsSizeInBytes); rowBuffer.channelState.setOffsetToken(offsetToken); @@ -191,17 +190,17 @@ public InsertValidationResponse insertRows( Set inputColumnNames = verifyInputColumns(row, null, tempRowCount); tempRowsSizeInBytes += addTempRow(row, tempRowCount, rowBuffer.tempStatsMap, inputColumnNames, tempRowCount); - checkBatchSizeEnforcedMaximum(tempRowsSizeInBytes); tempRowCount++; + checkBatchSizeEnforcedMaximum(tempRowsSizeInBytes); + if ((long) rowBuffer.bufferedRowCount + tempRowCount >= Integer.MAX_VALUE) { + throw new SFException(ErrorCode.INTERNAL_ERROR, "Row count reaches MAX value"); + } } checkBatchSizeRecommendedMaximum(tempRowsSizeInBytes); moveTempRowsToActualBuffer(tempRowCount); rowsSizeInBytes = tempRowsSizeInBytes; - if ((long) rowBuffer.bufferedRowCount + tempRowCount >= Integer.MAX_VALUE) { - throw new SFException(ErrorCode.INTERNAL_ERROR, "Row count reaches MAX value"); - } rowBuffer.bufferedRowCount += tempRowCount; rowBuffer.statsMap.forEach( (colName, stats) -> @@ -241,15 +240,15 @@ public InsertValidationResponse insertRows( response.addError(error); } checkBatchSizeEnforcedMaximum(tempRowsSizeInBytes); + if ((long) rowBuffer.bufferedRowCount + tempRowCount >= Integer.MAX_VALUE) { + throw new SFException(ErrorCode.INTERNAL_ERROR, "Row count reaches MAX value"); + } } if (!response.hasErrors()) { checkBatchSizeRecommendedMaximum(tempRowsSizeInBytes); moveTempRowsToActualBuffer(tempRowCount); rowsSizeInBytes = tempRowsSizeInBytes; - if ((long) rowBuffer.bufferedRowCount + tempRowCount >= Integer.MAX_VALUE) { - throw new SFException(ErrorCode.INTERNAL_ERROR, "Row count reaches MAX value"); - } rowBuffer.bufferedRowCount += tempRowCount; rowBuffer.statsMap.forEach( (colName, stats) -> @@ -257,9 +256,9 @@ public InsertValidationResponse insertRows( colName, RowBufferStats.getCombinedStats(stats, rowBuffer.tempStatsMap.get(colName)))); rowBuffer.channelState.setOffsetToken(offsetToken); + rowBuffer.bufferSize += rowsSizeInBytes; + rowBuffer.rowSizeMetric.accept(rowsSizeInBytes); } - rowBuffer.bufferSize += rowsSizeInBytes; - rowBuffer.rowSizeMetric.accept(rowsSizeInBytes); return response; } } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java index 549eefbde..269b6f28d 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java @@ -945,7 +945,6 @@ private void testE2ETimeHelper(OpenChannelRequest.OnErrorOption onErrorOption) { @Test public void testMaxInsertRowsBatchSize() { - testMaxInsertRowsBatchSizeHelper(OpenChannelRequest.OnErrorOption.CONTINUE); testMaxInsertRowsBatchSizeHelper(OpenChannelRequest.OnErrorOption.ABORT); testMaxInsertRowsBatchSizeHelper(OpenChannelRequest.OnErrorOption.SKIP_BATCH); }