Skip to content

Commit

Permalink
SNOW-906224: Add new ON_ERROR=SKIP_BATCH option (#597)
Browse files Browse the repository at this point in the history
Introduce a new ON_ERROR option SKIP_BATCH

- Skips the entire batch
- Returns all the erroneous rows as part of the response, along with their indexes and error messages

Pros:
- Easy implementation

Cons:
- Worse performance: data validation will be done twice for the good rows
  • Loading branch information
sfc-gh-tzhang authored Oct 13, 2023
1 parent 8fb07f3 commit 5baaf19
Show file tree
Hide file tree
Showing 7 changed files with 342 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ public class OpenChannelRequest {
public enum OnErrorOption {
CONTINUE, // CONTINUE loading the rows, and return all the errors in the response
ABORT, // ABORT the entire batch, and throw an exception when we hit the first error
SKIP_BATCH, // If an error in the batch is detected return a response containing all error row
// indexes. No data is ingested
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,132 @@ public int getOrdinal() {
}
}

/** Insert rows function strategy for ON_ERROR=CONTINUE */
public class ContinueIngestionStrategy<T> implements IngestionStrategy<T> {
@Override
public InsertValidationResponse insertRows(
AbstractRowBuffer<T> rowBuffer, Iterable<Map<String, Object>> rows, String offsetToken) {
InsertValidationResponse response = new InsertValidationResponse();
float rowsSizeInBytes = 0F;
int rowIndex = 0;
for (Map<String, Object> row : rows) {
InsertValidationResponse.InsertError error =
new InsertValidationResponse.InsertError(row, rowIndex);
try {
Set<String> inputColumnNames = verifyInputColumns(row, error, rowIndex);
rowsSizeInBytes +=
addRow(
row, rowBuffer.bufferedRowCount, rowBuffer.statsMap, inputColumnNames, rowIndex);
rowBuffer.bufferedRowCount++;
} catch (SFException e) {
error.setException(e);
response.addError(error);
} catch (Throwable e) {
logger.logWarn("Unexpected error happens during insertRows: {}", e);
error.setException(new SFException(e, ErrorCode.INTERNAL_ERROR, e.getMessage()));
response.addError(error);
}
checkBatchSizeEnforcedMaximum(rowsSizeInBytes);
rowIndex++;
if (rowBuffer.bufferedRowCount == Integer.MAX_VALUE) {
throw new SFException(ErrorCode.INTERNAL_ERROR, "Row count reaches MAX value");
}
}
checkBatchSizeRecommendedMaximum(rowsSizeInBytes);
rowBuffer.channelState.setOffsetToken(offsetToken);
rowBuffer.bufferSize += rowsSizeInBytes;
rowBuffer.rowSizeMetric.accept(rowsSizeInBytes);
return response;
}
}

/** Insert rows function strategy for ON_ERROR=ABORT */
public class AbortIngestionStrategy<T> implements IngestionStrategy<T> {
@Override
public InsertValidationResponse insertRows(
AbstractRowBuffer<T> rowBuffer, Iterable<Map<String, Object>> rows, String offsetToken) {
// If the on_error option is ABORT, simply throw the first exception
InsertValidationResponse response = new InsertValidationResponse();
float rowsSizeInBytes = 0F;
float tempRowsSizeInBytes = 0F;
int tempRowCount = 0;
for (Map<String, Object> row : rows) {
Set<String> inputColumnNames = verifyInputColumns(row, null, tempRowCount);
tempRowsSizeInBytes +=
addTempRow(row, tempRowCount, rowBuffer.tempStatsMap, inputColumnNames, tempRowCount);
checkBatchSizeEnforcedMaximum(tempRowsSizeInBytes);
tempRowCount++;
}
checkBatchSizeRecommendedMaximum(tempRowsSizeInBytes);

moveTempRowsToActualBuffer(tempRowCount);

rowsSizeInBytes = tempRowsSizeInBytes;
if ((long) rowBuffer.bufferedRowCount + tempRowCount >= Integer.MAX_VALUE) {
throw new SFException(ErrorCode.INTERNAL_ERROR, "Row count reaches MAX value");
}
rowBuffer.bufferedRowCount += tempRowCount;
rowBuffer.statsMap.forEach(
(colName, stats) ->
rowBuffer.statsMap.put(
colName,
RowBufferStats.getCombinedStats(stats, rowBuffer.tempStatsMap.get(colName))));
rowBuffer.channelState.setOffsetToken(offsetToken);
rowBuffer.bufferSize += rowsSizeInBytes;
rowBuffer.rowSizeMetric.accept(rowsSizeInBytes);
return response;
}
}

/** Insert rows function strategy for ON_ERROR=SKIP_BATCH */
public class SkipBatchIngestionStrategy<T> implements IngestionStrategy<T> {
@Override
public InsertValidationResponse insertRows(
AbstractRowBuffer<T> rowBuffer, Iterable<Map<String, Object>> rows, String offsetToken) {
InsertValidationResponse response = new InsertValidationResponse();
float rowsSizeInBytes = 0F;
float tempRowsSizeInBytes = 0F;
int tempRowCount = 0;
for (Map<String, Object> row : rows) {
InsertValidationResponse.InsertError error =
new InsertValidationResponse.InsertError(row, tempRowCount);
try {
Set<String> inputColumnNames = verifyInputColumns(row, error, tempRowCount);
tempRowsSizeInBytes +=
addTempRow(row, tempRowCount, rowBuffer.tempStatsMap, inputColumnNames, tempRowCount);
tempRowCount++;
} catch (SFException e) {
error.setException(e);
response.addError(error);
} catch (Throwable e) {
logger.logWarn("Unexpected error happens during insertRows: {}", e);
error.setException(new SFException(e, ErrorCode.INTERNAL_ERROR, e.getMessage()));
response.addError(error);
}
checkBatchSizeEnforcedMaximum(tempRowsSizeInBytes);
}

if (!response.hasErrors()) {
checkBatchSizeRecommendedMaximum(tempRowsSizeInBytes);
moveTempRowsToActualBuffer(tempRowCount);
rowsSizeInBytes = tempRowsSizeInBytes;
if ((long) rowBuffer.bufferedRowCount + tempRowCount >= Integer.MAX_VALUE) {
throw new SFException(ErrorCode.INTERNAL_ERROR, "Row count reaches MAX value");
}
rowBuffer.bufferedRowCount += tempRowCount;
rowBuffer.statsMap.forEach(
(colName, stats) ->
rowBuffer.statsMap.put(
colName,
RowBufferStats.getCombinedStats(stats, rowBuffer.tempStatsMap.get(colName))));
rowBuffer.channelState.setOffsetToken(offsetToken);
}
rowBuffer.bufferSize += rowsSizeInBytes;
rowBuffer.rowSizeMetric.accept(rowsSizeInBytes);
return response;
}
}

// Map the column name to the stats
@VisibleForTesting Map<String, RowBufferStats> statsMap;

Expand Down Expand Up @@ -297,76 +423,20 @@ Set<String> verifyInputColumns(
@Override
public InsertValidationResponse insertRows(
Iterable<Map<String, Object>> rows, String offsetToken) {
float rowsSizeInBytes = 0F;
if (!hasColumns()) {
throw new SFException(ErrorCode.INTERNAL_ERROR, "Empty column fields");
}
InsertValidationResponse response = new InsertValidationResponse();
InsertValidationResponse response = null;
this.flushLock.lock();
try {
this.channelState.updateInsertStats(System.currentTimeMillis(), this.bufferedRowCount);
if (onErrorOption == OpenChannelRequest.OnErrorOption.CONTINUE) {
// Used to map incoming row(nth row) to InsertError(for nth row) in response
int rowIndex = 0;
for (Map<String, Object> row : rows) {
InsertValidationResponse.InsertError error =
new InsertValidationResponse.InsertError(row, rowIndex);
try {
Set<String> inputColumnNames = verifyInputColumns(row, error, rowIndex);
rowsSizeInBytes +=
addRow(row, this.bufferedRowCount, this.statsMap, inputColumnNames, rowIndex);
this.bufferedRowCount++;
} catch (SFException e) {
error.setException(e);
response.addError(error);
} catch (Throwable e) {
logger.logWarn("Unexpected error happens during insertRows: {}", e);
error.setException(new SFException(e, ErrorCode.INTERNAL_ERROR, e.getMessage()));
response.addError(error);
}
checkBatchSizeEnforcedMaximum(rowsSizeInBytes);
rowIndex++;
if (this.bufferedRowCount == Integer.MAX_VALUE) {
throw new SFException(ErrorCode.INTERNAL_ERROR, "Row count reaches MAX value");
}
}
checkBatchSizeRecommendedMaximum(rowsSizeInBytes);
} else {
// If the on_error option is ABORT, simply throw the first exception
float tempRowsSizeInBytes = 0F;
int tempRowCount = 0;
for (Map<String, Object> row : rows) {
Set<String> inputColumnNames = verifyInputColumns(row, null, tempRowCount);
tempRowsSizeInBytes +=
addTempRow(row, tempRowCount, this.tempStatsMap, inputColumnNames, tempRowCount);
checkBatchSizeEnforcedMaximum(tempRowsSizeInBytes);
tempRowCount++;
}
checkBatchSizeRecommendedMaximum(tempRowsSizeInBytes);

moveTempRowsToActualBuffer(tempRowCount);

rowsSizeInBytes = tempRowsSizeInBytes;
if ((long) this.bufferedRowCount + tempRowCount >= Integer.MAX_VALUE) {
throw new SFException(ErrorCode.INTERNAL_ERROR, "Row count reaches MAX value");
}
this.bufferedRowCount += tempRowCount;
this.statsMap.forEach(
(colName, stats) ->
this.statsMap.put(
colName,
RowBufferStats.getCombinedStats(stats, this.tempStatsMap.get(colName))));
}

this.bufferSize += rowsSizeInBytes;
this.channelState.setOffsetToken(offsetToken);
this.rowSizeMetric.accept(rowsSizeInBytes);
IngestionStrategy<T> ingestionStrategy = createIngestionStrategy(onErrorOption);
response = ingestionStrategy.insertRows(this, rows, offsetToken);
} finally {
this.tempStatsMap.values().forEach(RowBufferStats::reset);
clearTempRows();
this.flushLock.unlock();
}

return response;
}

Expand Down Expand Up @@ -581,4 +651,18 @@ private void checkBatchSizeRecommendedMaximum(float batchSizeInBytes) {
INSERT_ROWS_RECOMMENDED_MAX_BATCH_SIZE_IN_BYTES);
}
}

/** Create the ingestion strategy based on the channel on_error option */
IngestionStrategy<T> createIngestionStrategy(OpenChannelRequest.OnErrorOption onErrorOption) {
switch (onErrorOption) {
case CONTINUE:
return new ContinueIngestionStrategy<>();
case ABORT:
return new AbortIngestionStrategy<>();
case SKIP_BATCH:
return new SkipBatchIngestionStrategy<>();
default:
throw new IllegalArgumentException("Unknown on error option: " + onErrorOption);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright (c) 2023 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;

import java.util.Map;
import net.snowflake.ingest.streaming.InsertValidationResponse;

/** Interface to a batch of rows into the row buffer based on different on error options */
public interface IngestionStrategy<T> {
/**
* Insert a batch of rows into the row buffer
*
* @param rows input row
* @param offsetToken offset token of the latest row in the batch
* @return insert response that possibly contains errors because of insertion failures
*/
InsertValidationResponse insertRows(
AbstractRowBuffer<T> rowBuffer, Iterable<Map<String, Object>> rows, String offsetToken);
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ public void setupSchema(List<ColumnMetadata> columns) {
this.statsMap.put(
column.getInternalName(), new RowBufferStats(column.getName(), column.getCollation()));

if (onErrorOption == OpenChannelRequest.OnErrorOption.ABORT) {
if (onErrorOption == OpenChannelRequest.OnErrorOption.ABORT
|| onErrorOption == OpenChannelRequest.OnErrorOption.SKIP_BATCH) {
this.tempStatsMap.put(
column.getInternalName(), new RowBufferStats(column.getName(), column.getCollation()));
}
Expand Down
Loading

0 comments on commit 5baaf19

Please sign in to comment.