diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java index a3ed90c54..dde639d00 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java @@ -236,10 +236,11 @@ public float getSize() { * * @param row the input row * @param error the insert error that we return to the customer + * @param rowIndex the index of the current row in the input batch * @return the set of input column names */ Set verifyInputColumns( - Map row, InsertValidationResponse.InsertError error) { + Map row, InsertValidationResponse.InsertError error, int rowIndex) { // Map of unquoted column name -> original column name Map inputColNamesMap = row.keySet().stream() @@ -260,7 +261,8 @@ Set verifyInputColumns( throw new SFException( ErrorCode.INVALID_FORMAT_ROW, "Extra columns: " + extraCols, - "Columns not present in the table shouldn't be specified. Row Index:%d"); + String.format( + "Columns not present in the table shouldn't be specified, rowIndex:%d", rowIndex)); } // Check for missing columns in the row @@ -278,7 +280,8 @@ Set verifyInputColumns( throw new SFException( ErrorCode.INVALID_FORMAT_ROW, "Missing columns: " + missingCols, - "Values for all non-nullable columns must be specified. Row Index:%d"); + String.format( + "Values for all non-nullable columns must be specified, rowIndex:%d", rowIndex)); } return inputColNamesMap.keySet(); @@ -304,12 +307,12 @@ public InsertValidationResponse insertRows( this.channelState.updateInsertStats(System.currentTimeMillis(), this.bufferedRowCount); if (onErrorOption == OpenChannelRequest.OnErrorOption.CONTINUE) { // Used to map incoming row(nth row) to InsertError(for nth row) in response - long rowIndex = 0; + int rowIndex = 0; for (Map row : rows) { InsertValidationResponse.InsertError error = new InsertValidationResponse.InsertError(row, rowIndex); try { - Set inputColumnNames = verifyInputColumns(row, error); + Set inputColumnNames = verifyInputColumns(row, error, rowIndex); rowsSizeInBytes += addRow(row, this.bufferedRowCount, this.statsMap, inputColumnNames, rowIndex); this.bufferedRowCount++; @@ -333,7 +336,7 @@ public InsertValidationResponse insertRows( float tempRowsSizeInBytes = 0F; int tempRowCount = 0; for (Map row : rows) { - Set inputColumnNames = verifyInputColumns(row, null); + Set inputColumnNames = verifyInputColumns(row, null, tempRowCount); tempRowsSizeInBytes += addTempRow(row, tempRowCount, this.tempStatsMap, inputColumnNames, tempRowCount); checkBatchSizeEnforcedMaximum(tempRowsSizeInBytes); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java b/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java index 7982ca6a4..4514b98b7 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java @@ -24,11 +24,7 @@ import java.time.ZoneOffset; import java.time.ZonedDateTime; import java.time.format.DateTimeParseException; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; +import java.util.*; import java.util.function.Supplier; import net.snowflake.client.jdbc.internal.google.common.collect.Sets; import net.snowflake.client.jdbc.internal.snowflake.common.core.SnowflakeDateTimeFormat; @@ -458,8 +454,8 @@ static TimestampWrapper validateAndParseTimestamp( throw new SFException( ErrorCode.INVALID_VALUE_ROW, String.format( - "Timestamp out of representable inclusive range of years between 1 and 9999, Row" - + " Index:%d", + "Timestamp out of representable inclusive range of years between 1 and 9999," + + " rowIndex:%d", insertRowIndex)); } return new TimestampWrapper(offsetDateTime, scale); @@ -592,8 +588,8 @@ static int validateAndParseDate(String columnName, Object input, long insertRowI throw new SFException( ErrorCode.INVALID_VALUE_ROW, String.format( - "Date out of representable inclusive range of years between -9999 and 9999, Row" - + " Index:%d", + "Date out of representable inclusive range of years between -9999 and 9999," + + " rowIndex:%d", insertRowIndex)); } @@ -820,7 +816,7 @@ static void checkValueInRange( throw new SFException( ErrorCode.INVALID_FORMAT_ROW, String.format( - "Number out of representable exclusive range of (-1e%s..1e%s), Row Index:%d", + "Number out of representable exclusive range of (-1e%s..1e%s), rowIndex:%d", precision - scale, precision - scale, insertRowIndex)); } } @@ -864,8 +860,7 @@ private static SFException typeNotAllowedException( return new SFException( ErrorCode.INVALID_FORMAT_ROW, String.format( - "Object of type %s cannot be ingested into Snowflake column %s of type %s, Row" - + " Index:%d", + "Object of type %s cannot be ingested into Snowflake column %s of type %s, rowIndex:%d", javaType.getName(), columnName, snowflakeType, insertRowIndex), String.format( String.format("Allowed Java types: %s", String.join(", ", allowedJavaTypes)))); @@ -888,7 +883,7 @@ private static SFException valueFormatNotAllowedException( return new SFException( ErrorCode.INVALID_VALUE_ROW, String.format( - "Value cannot be ingested into Snowflake column %s of type %s, Row Index:%d, reason:" + "Value cannot be ingested into Snowflake column %s of type %s, rowIndex:%d, reason:" + " %s", columnName, snowflakeType, rowIndex, reason)); } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java index 567dbf127..75966eb35 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java @@ -210,7 +210,7 @@ private float addRow( throw new SFException( ErrorCode.MAX_ROW_SIZE_EXCEEDED, String.format( - "rowSizeInBytes=%.3f, maxAllowedRowSizeInBytes=%d, Row Index=%d", + "rowSizeInBytes:%.3f, maxAllowedRowSizeInBytes:%d, rowIndex:%d", size, clientBufferParameters.getMaxAllowedRowSizeInBytes(), insertRowsCurrIndex)); } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/DataValidationUtilTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/DataValidationUtilTest.java index 0694020e7..86706fcf2 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/DataValidationUtilTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/DataValidationUtilTest.java @@ -676,19 +676,19 @@ public void testTooLargeMultiByteSemiStructuredValues() { expectErrorCodeAndMessage( ErrorCode.INVALID_VALUE_ROW, "The given row cannot be converted to the internal format due to invalid value: Value" - + " cannot be ingested into Snowflake column COL of type VARIANT, Row Index:0, reason:" + + " cannot be ingested into Snowflake column COL of type VARIANT, rowIndex:0, reason:" + " Variant too long: length=18874376 maxLength=16777152", () -> validateAndParseVariant("COL", m, 0)); expectErrorCodeAndMessage( ErrorCode.INVALID_VALUE_ROW, "The given row cannot be converted to the internal format due to invalid value: Value" - + " cannot be ingested into Snowflake column COL of type ARRAY, Row Index:0, reason:" + + " cannot be ingested into Snowflake column COL of type ARRAY, rowIndex:0, reason:" + " Array too large. length=18874378 maxLength=16777152", () -> validateAndParseArray("COL", m, 0)); expectErrorCodeAndMessage( ErrorCode.INVALID_VALUE_ROW, "The given row cannot be converted to the internal format due to invalid value: Value" - + " cannot be ingested into Snowflake column COL of type OBJECT, Row Index:0, reason:" + + " cannot be ingested into Snowflake column COL of type OBJECT, rowIndex:0, reason:" + " Object too large. length=18874376 maxLength=16777152", () -> validateAndParseObject("COL", m, 0)); } @@ -1005,13 +1005,13 @@ public void testExceptionMessages() { expectErrorCodeAndMessage( ErrorCode.INVALID_FORMAT_ROW, "The given row cannot be converted to the internal format: Object of type java.lang.Object" - + " cannot be ingested into Snowflake column COL of type BOOLEAN, Row Index:0. Allowed" + + " cannot be ingested into Snowflake column COL of type BOOLEAN, rowIndex:0. Allowed" + " Java types: boolean, Number, String", () -> validateAndParseBoolean("COL", new Object(), 0)); expectErrorCodeAndMessage( ErrorCode.INVALID_VALUE_ROW, "The given row cannot be converted to the internal format due to invalid value: Value" - + " cannot be ingested into Snowflake column COL of type BOOLEAN, Row Index:0, reason:" + + " cannot be ingested into Snowflake column COL of type BOOLEAN, rowIndex:0, reason:" + " Not a valid boolean, see" + " https://docs.snowflake.com/en/sql-reference/data-types-logical.html#conversion-to-boolean" + " for the list of supported formats", @@ -1021,13 +1021,13 @@ public void testExceptionMessages() { expectErrorCodeAndMessage( ErrorCode.INVALID_FORMAT_ROW, "The given row cannot be converted to the internal format: Object of type java.lang.Object" - + " cannot be ingested into Snowflake column COL of type TIME, Row Index:0. Allowed" + + " cannot be ingested into Snowflake column COL of type TIME, rowIndex:0. Allowed" + " Java types: String, LocalTime, OffsetTime", () -> validateAndParseTime("COL", new Object(), 10, 0)); expectErrorCodeAndMessage( ErrorCode.INVALID_VALUE_ROW, "The given row cannot be converted to the internal format due to invalid value: Value" - + " cannot be ingested into Snowflake column COL of type TIME, Row Index:0, reason:" + + " cannot be ingested into Snowflake column COL of type TIME, rowIndex:0, reason:" + " Not a valid time, see" + " https://docs.snowflake.com/en/user-guide/data-load-snowpipe-streaming-overview for" + " the list of supported formats", @@ -1037,13 +1037,13 @@ public void testExceptionMessages() { expectErrorCodeAndMessage( ErrorCode.INVALID_FORMAT_ROW, "The given row cannot be converted to the internal format: Object of type java.lang.Object" - + " cannot be ingested into Snowflake column COL of type DATE, Row Index:0. Allowed" + + " cannot be ingested into Snowflake column COL of type DATE, rowIndex:0. Allowed" + " Java types: String, LocalDate, LocalDateTime, ZonedDateTime, OffsetDateTime", () -> validateAndParseDate("COL", new Object(), 0)); expectErrorCodeAndMessage( ErrorCode.INVALID_VALUE_ROW, "The given row cannot be converted to the internal format due to invalid value: Value" - + " cannot be ingested into Snowflake column COL of type DATE, Row Index:0, reason:" + + " cannot be ingested into Snowflake column COL of type DATE, rowIndex:0, reason:" + " Not a valid value, see" + " https://docs.snowflake.com/en/user-guide/data-load-snowpipe-streaming-overview for" + " the list of supported formats", @@ -1053,14 +1053,14 @@ public void testExceptionMessages() { expectErrorCodeAndMessage( ErrorCode.INVALID_FORMAT_ROW, "The given row cannot be converted to the internal format: Object of type java.lang.Object" - + " cannot be ingested into Snowflake column COL of type TIMESTAMP, Row Index:0." + + " cannot be ingested into Snowflake column COL of type TIMESTAMP, rowIndex:0." + " Allowed Java types: String, LocalDate, LocalDateTime, ZonedDateTime," + " OffsetDateTime", () -> validateAndParseTimestamp("COL", new Object(), 3, UTC, true, 0)); expectErrorCodeAndMessage( ErrorCode.INVALID_VALUE_ROW, "The given row cannot be converted to the internal format due to invalid value: Value" - + " cannot be ingested into Snowflake column COL of type TIMESTAMP, Row Index:0," + + " cannot be ingested into Snowflake column COL of type TIMESTAMP, rowIndex:0," + " reason: Not a valid value, see" + " https://docs.snowflake.com/en/user-guide/data-load-snowpipe-streaming-overview for" + " the list of supported formats", @@ -1070,14 +1070,14 @@ public void testExceptionMessages() { expectErrorCodeAndMessage( ErrorCode.INVALID_FORMAT_ROW, "The given row cannot be converted to the internal format: Object of type java.lang.Object" - + " cannot be ingested into Snowflake column COL of type TIMESTAMP, Row Index:0." + + " cannot be ingested into Snowflake column COL of type TIMESTAMP, rowIndex:0." + " Allowed Java types: String, LocalDate, LocalDateTime, ZonedDateTime," + " OffsetDateTime", () -> validateAndParseTimestamp("COL", new Object(), 3, UTC, false, 0)); expectErrorCodeAndMessage( ErrorCode.INVALID_VALUE_ROW, "The given row cannot be converted to the internal format due to invalid value: Value" - + " cannot be ingested into Snowflake column COL of type TIMESTAMP, Row Index:0," + + " cannot be ingested into Snowflake column COL of type TIMESTAMP, rowIndex:0," + " reason: Not a valid value, see" + " https://docs.snowflake.com/en/user-guide/data-load-snowpipe-streaming-overview for" + " the list of supported formats", @@ -1087,14 +1087,14 @@ public void testExceptionMessages() { expectErrorCodeAndMessage( ErrorCode.INVALID_FORMAT_ROW, "The given row cannot be converted to the internal format: Object of type java.lang.Object" - + " cannot be ingested into Snowflake column COL of type TIMESTAMP, Row Index:0." + + " cannot be ingested into Snowflake column COL of type TIMESTAMP, rowIndex:0." + " Allowed Java types: String, LocalDate, LocalDateTime, ZonedDateTime," + " OffsetDateTime", () -> validateAndParseTimestamp("COL", new Object(), 3, UTC, false, 0)); expectErrorCodeAndMessage( ErrorCode.INVALID_VALUE_ROW, "The given row cannot be converted to the internal format due to invalid value: Value" - + " cannot be ingested into Snowflake column COL of type TIMESTAMP, Row Index:0," + + " cannot be ingested into Snowflake column COL of type TIMESTAMP, rowIndex:0," + " reason: Not a valid value, see" + " https://docs.snowflake.com/en/user-guide/data-load-snowpipe-streaming-overview for" + " the list of supported formats", @@ -1104,13 +1104,13 @@ public void testExceptionMessages() { expectErrorCodeAndMessage( ErrorCode.INVALID_FORMAT_ROW, "The given row cannot be converted to the internal format: Object of type java.lang.Object" - + " cannot be ingested into Snowflake column COL of type NUMBER, Row Index:0. Allowed" + + " cannot be ingested into Snowflake column COL of type NUMBER, rowIndex:0. Allowed" + " Java types: int, long, byte, short, float, double, BigDecimal, BigInteger, String", () -> validateAndParseBigDecimal("COL", new Object(), 0)); expectErrorCodeAndMessage( ErrorCode.INVALID_VALUE_ROW, "The given row cannot be converted to the internal format due to invalid value: Value" - + " cannot be ingested into Snowflake column COL of type NUMBER, Row Index:0, reason:" + + " cannot be ingested into Snowflake column COL of type NUMBER, rowIndex:0, reason:" + " Not a valid number", () -> validateAndParseBigDecimal("COL", "abc", 0)); @@ -1118,13 +1118,13 @@ public void testExceptionMessages() { expectErrorCodeAndMessage( ErrorCode.INVALID_FORMAT_ROW, "The given row cannot be converted to the internal format: Object of type java.lang.Object" - + " cannot be ingested into Snowflake column COL of type REAL, Row Index:0. Allowed" + + " cannot be ingested into Snowflake column COL of type REAL, rowIndex:0. Allowed" + " Java types: Number, String", () -> validateAndParseReal("COL", new Object(), 0)); expectErrorCodeAndMessage( ErrorCode.INVALID_VALUE_ROW, "The given row cannot be converted to the internal format due to invalid value: Value" - + " cannot be ingested into Snowflake column COL of type REAL, Row Index:0, reason:" + + " cannot be ingested into Snowflake column COL of type REAL, rowIndex:0, reason:" + " Not a valid decimal number", () -> validateAndParseReal("COL", "abc", 0)); @@ -1132,13 +1132,13 @@ public void testExceptionMessages() { expectErrorCodeAndMessage( ErrorCode.INVALID_FORMAT_ROW, "The given row cannot be converted to the internal format: Object of type java.lang.Object" - + " cannot be ingested into Snowflake column COL of type STRING, Row Index:0. Allowed" + + " cannot be ingested into Snowflake column COL of type STRING, rowIndex:0. Allowed" + " Java types: String, Number, boolean, char", () -> validateAndParseString("COL", new Object(), Optional.empty(), 0)); expectErrorCodeAndMessage( ErrorCode.INVALID_VALUE_ROW, "The given row cannot be converted to the internal format due to invalid value: Value" - + " cannot be ingested into Snowflake column COL of type STRING, Row Index:0, reason:" + + " cannot be ingested into Snowflake column COL of type STRING, rowIndex:0, reason:" + " String too long: length=3 characters maxLength=2 characters", () -> validateAndParseString("COL", "abc", Optional.of(2), 0)); @@ -1146,19 +1146,19 @@ public void testExceptionMessages() { expectErrorCodeAndMessage( ErrorCode.INVALID_FORMAT_ROW, "The given row cannot be converted to the internal format: Object of type java.lang.Object" - + " cannot be ingested into Snowflake column COL of type BINARY, Row Index:0. Allowed" + + " cannot be ingested into Snowflake column COL of type BINARY, rowIndex:0. Allowed" + " Java types: byte[], String", () -> validateAndParseBinary("COL", new Object(), Optional.empty(), 0)); expectErrorCodeAndMessage( ErrorCode.INVALID_VALUE_ROW, "The given row cannot be converted to the internal format due to invalid value: Value" - + " cannot be ingested into Snowflake column COL of type BINARY, Row Index:0, reason:" + + " cannot be ingested into Snowflake column COL of type BINARY, rowIndex:0, reason:" + " Binary too long: length=2 maxLength=1", () -> validateAndParseBinary("COL", new byte[] {1, 2}, Optional.of(1), 0)); expectErrorCodeAndMessage( ErrorCode.INVALID_VALUE_ROW, "The given row cannot be converted to the internal format due to invalid value: Value" - + " cannot be ingested into Snowflake column COL of type BINARY, Row Index:0, reason:" + + " cannot be ingested into Snowflake column COL of type BINARY, rowIndex:0, reason:" + " Not a valid hex string", () -> validateAndParseBinary("COL", "ghi", Optional.empty(), 0)); @@ -1166,14 +1166,14 @@ public void testExceptionMessages() { expectErrorCodeAndMessage( ErrorCode.INVALID_FORMAT_ROW, "The given row cannot be converted to the internal format: Object of type java.lang.Object" - + " cannot be ingested into Snowflake column COL of type VARIANT, Row Index:0. Allowed" + + " cannot be ingested into Snowflake column COL of type VARIANT, rowIndex:0. Allowed" + " Java types: String, Primitive data types and their arrays, java.time.*, List," + " Map, T[]", () -> validateAndParseVariant("COL", new Object(), 0)); expectErrorCodeAndMessage( ErrorCode.INVALID_VALUE_ROW, "The given row cannot be converted to the internal format due to invalid value: Value" - + " cannot be ingested into Snowflake column COL of type VARIANT, Row Index:0, reason:" + + " cannot be ingested into Snowflake column COL of type VARIANT, rowIndex:0, reason:" + " Not a valid JSON", () -> validateAndParseVariant("COL", "][", 0)); @@ -1181,14 +1181,14 @@ public void testExceptionMessages() { expectErrorCodeAndMessage( ErrorCode.INVALID_FORMAT_ROW, "The given row cannot be converted to the internal format: Object of type java.lang.Object" - + " cannot be ingested into Snowflake column COL of type ARRAY, Row Index:0. Allowed" + + " cannot be ingested into Snowflake column COL of type ARRAY, rowIndex:0. Allowed" + " Java types: String, Primitive data types and their arrays, java.time.*, List," + " Map, T[]", () -> validateAndParseArray("COL", new Object(), 0)); expectErrorCodeAndMessage( ErrorCode.INVALID_VALUE_ROW, "The given row cannot be converted to the internal format due to invalid value: Value" - + " cannot be ingested into Snowflake column COL of type ARRAY, Row Index:0, reason:" + + " cannot be ingested into Snowflake column COL of type ARRAY, rowIndex:0, reason:" + " Not a valid JSON", () -> validateAndParseArray("COL", "][", 0)); @@ -1196,14 +1196,14 @@ public void testExceptionMessages() { expectErrorCodeAndMessage( ErrorCode.INVALID_FORMAT_ROW, "The given row cannot be converted to the internal format: Object of type java.lang.Object" - + " cannot be ingested into Snowflake column COL of type OBJECT, Row Index:0. Allowed" + + " cannot be ingested into Snowflake column COL of type OBJECT, rowIndex:0. Allowed" + " Java types: String, Primitive data types and their arrays, java.time.*, List," + " Map, T[]", () -> validateAndParseObject("COL", new Object(), 0)); expectErrorCodeAndMessage( ErrorCode.INVALID_VALUE_ROW, "The given row cannot be converted to the internal format due to invalid value: Value" - + " cannot be ingested into Snowflake column COL of type OBJECT, Row Index:0, reason:" + + " cannot be ingested into Snowflake column COL of type OBJECT, rowIndex:0, reason:" + " Not a valid JSON", () -> validateAndParseObject("COL", "}{", 0)); } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java index 45426283d..8d71d9a44 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java @@ -307,8 +307,8 @@ public void testRowIndexWithMultipleRowsWithError() { .getMessage() .equalsIgnoreCase( "The given row cannot be converted to the internal format due to invalid value:" - + " Value cannot be ingested into Snowflake column COLCHAR of type STRING, Row" - + " Index:1, reason: String too long: length=22 characters maxLength=11" + + " Value cannot be ingested into Snowflake column COLCHAR of type STRING," + + " rowIndex:1, reason: String too long: length=22 characters maxLength=11" + " characters")); } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java index ad09c0a69..8fbf67264 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java @@ -588,8 +588,8 @@ public void testInsertTooLargeRow() { .collect(Collectors.toList()); String expectedMessage = - "The given row exceeds the maximum allowed row size rowSizeInBytes=67109128.000," - + " maxAllowedRowSizeInBytes=67108864, Row Index=0"; + "The given row exceeds the maximum allowed row size rowSizeInBytes:67109128.000," + + " maxAllowedRowSizeInBytes:67108864, rowIndex:0"; Map row = new HashMap<>(); schema.forEach(x -> row.put(x.getName(), byteArrayOneMb));