diff --git a/src/main/java/net/snowflake/ingest/streaming/InsertValidationResponse.java b/src/main/java/net/snowflake/ingest/streaming/InsertValidationResponse.java index 31a9c416f..f3ccae363 100644 --- a/src/main/java/net/snowflake/ingest/streaming/InsertValidationResponse.java +++ b/src/main/java/net/snowflake/ingest/streaming/InsertValidationResponse.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.streaming; @@ -115,6 +115,18 @@ public void setExtraColNames(List extraColNames) { this.extraColNames = extraColNames; } + /** + * Add an extra column name in the input row compared with the table schema + * + * @param extraColName the extra column name + */ + public void addExtraColName(String extraColName) { + if (extraColNames == null) { + extraColNames = new ArrayList<>(); + } + extraColNames.add(extraColName); + } + /** Get the list of extra column names in the input row compared with the table schema */ public List getExtraColNames() { return extraColNames; @@ -125,6 +137,18 @@ public void setMissingNotNullColNames(List missingNotNullColNames) { this.missingNotNullColNames = missingNotNullColNames; } + /** + * Add a missing non-nullable column name in the input row compared with the table schema + * + * @param missingNotNullColName the missing non-nullable column name + */ + public void addMissingNotNullColName(String missingNotNullColName) { + if (missingNotNullColNames == null) { + missingNotNullColNames = new ArrayList<>(); + } + missingNotNullColNames.add(missingNotNullColName); + } + /** * Get the list of missing non-nullable column names in the input row compared with the table * schema @@ -141,6 +165,19 @@ public void setNullValueForNotNullColNames(List nullValueForNotNullColNa this.nullValueForNotNullColNames = nullValueForNotNullColNames; } + /** + * Add a name of non-nullable column which have null value in the input row compared with the + * table schema + * + * @param nullValueForNotNullColName the name of non-nullable column which have null value + */ + public void addNullValueForNotNullColName(String nullValueForNotNullColName) { + if (nullValueForNotNullColNames == null) { + nullValueForNotNullColNames = new ArrayList<>(); + } + nullValueForNotNullColNames.add(nullValueForNotNullColName); + } + /** * Get the list of names of non-nullable column which have null value in the input row compared * with the table schema diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java index 69558d5d9..00f79ad0c 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java @@ -162,7 +162,12 @@ public InsertValidationResponse insertRows( Set inputColumnNames = verifyInputColumns(row, error, rowIndex); rowsSizeInBytes += addRow( - row, rowBuffer.bufferedRowCount, rowBuffer.statsMap, inputColumnNames, rowIndex); + row, + rowBuffer.bufferedRowCount, + rowBuffer.statsMap, + inputColumnNames, + rowIndex, + error); rowBuffer.bufferedRowCount++; } catch (SFException e) { error.setException(e); @@ -200,7 +205,13 @@ public InsertValidationResponse insertRows( for (Map row : rows) { Set inputColumnNames = verifyInputColumns(row, null, tempRowCount); tempRowsSizeInBytes += - addTempRow(row, tempRowCount, rowBuffer.tempStatsMap, inputColumnNames, tempRowCount); + addTempRow( + row, + tempRowCount, + rowBuffer.tempStatsMap, + inputColumnNames, + tempRowCount, + new InsertValidationResponse.InsertError(row, 0) /* dummy error */); tempRowCount++; if ((long) rowBuffer.bufferedRowCount + tempRowCount >= Integer.MAX_VALUE) { throw new SFException(ErrorCode.INTERNAL_ERROR, "Row count reaches MAX value"); @@ -249,7 +260,8 @@ public InsertValidationResponse insertRows( try { Set inputColumnNames = verifyInputColumns(row, error, rowIndex); tempRowsSizeInBytes += - addTempRow(row, tempRowCount, rowBuffer.tempStatsMap, inputColumnNames, rowIndex); + addTempRow( + row, tempRowCount, rowBuffer.tempStatsMap, inputColumnNames, rowIndex, error); tempRowCount++; } catch (SFException e) { error.setException(e); @@ -575,6 +587,8 @@ public ChannelData flush() { * @param formattedInputColumnNames list of input column names after formatting * @param insertRowIndex Index of the rows given in insertRows API. Not the same as * bufferedRowIndex + * @param error Insert error object, used to populate error details when doing structured data + * type parsing * @return row size */ abstract float addRow( @@ -582,7 +596,8 @@ abstract float addRow( int bufferedRowIndex, Map statsMap, Set formattedInputColumnNames, - final long insertRowIndex); + final long insertRowIndex, + InsertValidationResponse.InsertError error); /** * Add an input row to the temporary row buffer. @@ -595,6 +610,8 @@ abstract float addRow( * @param statsMap column stats map * @param formattedInputColumnNames list of input column names after formatting * @param insertRowIndex index of the row being inserteed from User Input List + * @param error Insert error object, used to populate error details when doing structured data + * type parsing * @return row size */ abstract float addTempRow( @@ -602,7 +619,8 @@ abstract float addTempRow( int curRowIndex, Map statsMap, Set formattedInputColumnNames, - long insertRowIndex); + long insertRowIndex, + InsertValidationResponse.InsertError error); /** Move rows from the temporary buffer to the current row buffer. */ abstract void moveTempRowsToActualBuffer(int tempRowCount); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParser.java b/src/main/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParser.java index 3ad2f02eb..7a7ae5eab 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParser.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParser.java @@ -5,6 +5,7 @@ package net.snowflake.ingest.streaming.internal; import static net.snowflake.ingest.streaming.internal.DataValidationUtil.checkFixedLengthByteArray; +import static net.snowflake.ingest.utils.Utils.concatDotPath; import java.math.BigDecimal; import java.math.BigInteger; @@ -19,7 +20,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.stream.Collectors; +import net.snowflake.ingest.streaming.InsertValidationResponse; import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.SFException; @@ -49,6 +50,7 @@ class IcebergParquetValueParser { * @param defaultTimezone default timezone to use for timestamp parsing * @param insertRowsCurrIndex Row index corresponding the row to parse (w.r.t input rows in * insertRows API, and not buffered row) + * @param error InsertError object to add errors * @return parsed value and byte size of Parquet internal representation */ static ParquetBufferValue parseColumnValueToParquet( @@ -57,10 +59,18 @@ static ParquetBufferValue parseColumnValueToParquet( Map statsMap, SubColumnFinder subColumnFinder, ZoneId defaultTimezone, - long insertRowsCurrIndex) { + long insertRowsCurrIndex, + InsertValidationResponse.InsertError error) { Utils.assertNotNull("Parquet column stats map", statsMap); return parseColumnValueToParquet( - value, type, statsMap, subColumnFinder, defaultTimezone, insertRowsCurrIndex, false); + value, + type, + statsMap, + subColumnFinder, + defaultTimezone, + insertRowsCurrIndex, + false /* isDescendantsOfRepeatingGroup */, + error); } private static ParquetBufferValue parseColumnValueToParquet( @@ -70,7 +80,8 @@ private static ParquetBufferValue parseColumnValueToParquet( SubColumnFinder subColumnFinder, ZoneId defaultTimezone, long insertRowsCurrIndex, - boolean isDescendantsOfRepeatingGroup) { + boolean isDescendantsOfRepeatingGroup, + InsertValidationResponse.InsertError error) { float estimatedParquetSize = 0F; if (type.getId() == null) { @@ -86,8 +97,8 @@ private static ParquetBufferValue parseColumnValueToParquet( } } + String path = subColumnFinder.getDotPath(id); if (value != null) { - String path = subColumnFinder.getDotPath(id); if (type.isPrimitive()) { RowBufferStats stats = statsMap.get(id.toString()); estimatedParquetSize += ParquetBufferValue.DEFINITION_LEVEL_ENCODING_BYTE_LEN; @@ -160,18 +171,14 @@ private static ParquetBufferValue parseColumnValueToParquet( subColumnFinder, defaultTimezone, insertRowsCurrIndex, - isDescendantsOfRepeatingGroup); + isDescendantsOfRepeatingGroup, + error); } } if (value == null) { if (type.isRepetition(Repetition.REQUIRED)) { - throw new SFException( - ErrorCode.INVALID_FORMAT_ROW, - subColumnFinder.getDotPath(id), - String.format( - "Passed null to non nullable field, rowIndex:%d, column:%s", - insertRowsCurrIndex, subColumnFinder.getDotPath(id))); + error.addNullValueForNotNullColName(path); } subColumnFinder .getSubColumns(id) @@ -381,6 +388,7 @@ private static int timeUnitToScale(LogicalTypeAnnotation.TimeUnit timeUnit) { * @param defaultTimezone default timezone to use for timestamp parsing * @param insertRowsCurrIndex Used for logging the row of index given in insertRows API * @param isDescendantsOfRepeatingGroup true if the column is a descendant of a repeating group, + * @param error InsertError object to add errors * @return list of parsed values */ private static ParquetBufferValue getGroupValue( @@ -390,7 +398,8 @@ private static ParquetBufferValue getGroupValue( SubColumnFinder subColumnFinder, ZoneId defaultTimezone, final long insertRowsCurrIndex, - boolean isDescendantsOfRepeatingGroup) { + boolean isDescendantsOfRepeatingGroup, + InsertValidationResponse.InsertError error) { LogicalTypeAnnotation logicalTypeAnnotation = type.getLogicalTypeAnnotation(); if (logicalTypeAnnotation == null) { return getStructValue( @@ -400,15 +409,16 @@ private static ParquetBufferValue getGroupValue( subColumnFinder, defaultTimezone, insertRowsCurrIndex, - isDescendantsOfRepeatingGroup); + isDescendantsOfRepeatingGroup, + error); } if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.ListLogicalTypeAnnotation) { return get3LevelListValue( - value, type, statsMap, subColumnFinder, defaultTimezone, insertRowsCurrIndex); + value, type, statsMap, subColumnFinder, defaultTimezone, insertRowsCurrIndex, error); } if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.MapLogicalTypeAnnotation) { return get3LevelMapValue( - value, type, statsMap, subColumnFinder, defaultTimezone, insertRowsCurrIndex); + value, type, statsMap, subColumnFinder, defaultTimezone, insertRowsCurrIndex, error); } throw new SFException( ErrorCode.UNKNOWN_DATA_TYPE, @@ -429,35 +439,48 @@ private static ParquetBufferValue getStructValue( SubColumnFinder subColumnFinder, ZoneId defaultTimezone, final long insertRowsCurrIndex, - boolean isDescendantsOfRepeatingGroup) { + boolean isDescendantsOfRepeatingGroup, + InsertValidationResponse.InsertError error) { Map structVal = DataValidationUtil.validateAndParseIcebergStruct( subColumnFinder.getDotPath(type.getId()), value, insertRowsCurrIndex); Set extraFields = new HashSet<>(structVal.keySet()); + List missingFields = new ArrayList<>(); List listVal = new ArrayList<>(type.getFieldCount()); float estimatedParquetSize = 0f; for (int i = 0; i < type.getFieldCount(); i++) { - ParquetBufferValue parsedValue = - parseColumnValueToParquet( - structVal.getOrDefault(type.getFieldName(i), null), - type.getType(i), - statsMap, - subColumnFinder, - defaultTimezone, - insertRowsCurrIndex, - isDescendantsOfRepeatingGroup); extraFields.remove(type.getFieldName(i)); - listVal.add(parsedValue.getValue()); - estimatedParquetSize += parsedValue.getSize(); + if (structVal.containsKey(type.getFieldName(i))) { + ParquetBufferValue parsedValue = + parseColumnValueToParquet( + structVal.get(type.getFieldName(i)), + type.getType(i), + statsMap, + subColumnFinder, + defaultTimezone, + insertRowsCurrIndex, + isDescendantsOfRepeatingGroup, + error); + listVal.add(parsedValue.getValue()); + estimatedParquetSize += parsedValue.getSize(); + } else { + if (type.getType(i).isRepetition(Repetition.REQUIRED)) { + missingFields.add(type.getFieldName(i)); + } else { + listVal.add(null); + } + } } - if (!extraFields.isEmpty()) { - String extraFieldsStr = extraFields.stream().collect(Collectors.joining(", ", "[", "]")); - throw new SFException( - ErrorCode.INVALID_FORMAT_ROW, - "Extra fields: " + extraFieldsStr, - String.format( - "Fields not present in the struct %s shouldn't be specified, rowIndex:%d", - subColumnFinder.getDotPath(type.getId()), insertRowsCurrIndex)); + + for (String missingField : missingFields) { + List missingFieldPath = new ArrayList<>(subColumnFinder.getPath(type.getId())); + missingFieldPath.add(missingField); + error.addMissingNotNullColName(concatDotPath(missingFieldPath.toArray(new String[0]))); + } + for (String extraField : extraFields) { + List extraFieldPath = new ArrayList<>(subColumnFinder.getPath(type.getId())); + extraFieldPath.add(extraField); + error.addExtraColName(concatDotPath(extraFieldPath.toArray(new String[0]))); } return new ParquetBufferValue(listVal, estimatedParquetSize); } @@ -475,7 +498,8 @@ private static ParquetBufferValue get3LevelListValue( Map statsMap, SubColumnFinder subColumnFinder, ZoneId defaultTimezone, - final long insertRowsCurrIndex) { + final long insertRowsCurrIndex, + InsertValidationResponse.InsertError error) { Iterable iterableVal = DataValidationUtil.validateAndParseIcebergList( subColumnFinder.getDotPath(type.getId()), value, insertRowsCurrIndex); @@ -490,7 +514,8 @@ private static ParquetBufferValue get3LevelListValue( subColumnFinder, defaultTimezone, insertRowsCurrIndex, - true); + true /* isDecedentOfRepeatingGroup */, + error); listVal.add(Collections.singletonList(parsedValue.getValue())); estimatedParquetSize += parsedValue.getSize(); } @@ -515,7 +540,8 @@ private static ParquetBufferValue get3LevelMapValue( Map statsMap, SubColumnFinder subColumnFinder, ZoneId defaultTimezone, - final long insertRowsCurrIndex) { + final long insertRowsCurrIndex, + InsertValidationResponse.InsertError error) { Map mapVal = DataValidationUtil.validateAndParseIcebergMap( subColumnFinder.getDotPath(type.getId()), value, insertRowsCurrIndex); @@ -530,7 +556,8 @@ private static ParquetBufferValue get3LevelMapValue( subColumnFinder, defaultTimezone, insertRowsCurrIndex, - true); + true /* isDecedentOfRepeatingGroup */, + error); ParquetBufferValue parsedValue = parseColumnValueToParquet( entry.getValue(), @@ -539,7 +566,8 @@ private static ParquetBufferValue get3LevelMapValue( subColumnFinder, defaultTimezone, insertRowsCurrIndex, - true); + true /* isDecedentOfRepeatingGroup */, + error); listVal.add(Arrays.asList(parsedKey.getValue(), parsedValue.getValue())); estimatedParquetSize += parsedKey.getSize() + parsedValue.getSize(); } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java index f5654d07b..96b261491 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java @@ -22,6 +22,7 @@ import net.snowflake.client.jdbc.internal.google.common.collect.Sets; import net.snowflake.ingest.connection.RequestBuilder; import net.snowflake.ingest.connection.TelemetryService; +import net.snowflake.ingest.streaming.InsertValidationResponse; import net.snowflake.ingest.streaming.OffsetTokenVerificationFunction; import net.snowflake.ingest.streaming.OpenChannelRequest; import net.snowflake.ingest.utils.Constants; @@ -275,8 +276,9 @@ float addRow( int bufferedRowIndex, Map statsMap, Set formattedInputColumnNames, - final long insertRowIndex) { - return addRow(row, this::writeRow, statsMap, formattedInputColumnNames, insertRowIndex); + final long insertRowIndex, + InsertValidationResponse.InsertError error) { + return addRow(row, this::writeRow, statsMap, formattedInputColumnNames, insertRowIndex, error); } void writeRow(List row) { @@ -289,8 +291,9 @@ float addTempRow( int curRowIndex, Map statsMap, Set formattedInputColumnNames, - long insertRowIndex) { - return addRow(row, tempData::add, statsMap, formattedInputColumnNames, insertRowIndex); + long insertRowIndex, + InsertValidationResponse.InsertError error) { + return addRow(row, tempData::add, statsMap, formattedInputColumnNames, insertRowIndex, error); } /** @@ -303,6 +306,7 @@ float addTempRow( * @param insertRowsCurrIndex Row index of the input Rows passed in {@link * net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel#insertRows(Iterable, * String)} + * @param error insertion error object to populate if there is an error * @return row size */ private float addRow( @@ -310,7 +314,8 @@ private float addRow( Consumer> out, Map statsMap, Set inputColumnNames, - long insertRowsCurrIndex) { + long insertRowsCurrIndex, + InsertValidationResponse.InsertError error) { Object[] indexedRow = new Object[fieldIndex.size()]; float size = 0F; @@ -333,7 +338,8 @@ private float addRow( forkedStatsMap, subColumnFinder, defaultTimezone, - insertRowsCurrIndex) + insertRowsCurrIndex, + error) : SnowflakeParquetValueParser.parseColumnValueToParquet( value, column, @@ -346,6 +352,19 @@ private float addRow( size += valueWithSize.getSize(); } + if (error.getMissingNotNullColNames() != null + || error.getExtraColNames() != null + || error.getNullValueForNotNullColNames() != null) { + throw new SFException( + ErrorCode.INVALID_FORMAT_ROW, + String.format("Invalid row %d", insertRowsCurrIndex), + String.format( + "missingNotNullColNames=%s, extraColNames=%s, nullValueForNotNullColNames=%s", + error.getMissingNotNullColNames(), + error.getExtraColNames(), + error.getNullValueForNotNullColNames())); + } + long rowSizeRoundedUp = Double.valueOf(Math.ceil(size)).longValue(); if (rowSizeRoundedUp > clientBufferParameters.getMaxAllowedRowSizeInBytes()) { diff --git a/src/main/java/net/snowflake/ingest/utils/SubColumnFinder.java b/src/main/java/net/snowflake/ingest/utils/SubColumnFinder.java index 7e52c7eb6..7293d13bc 100644 --- a/src/main/java/net/snowflake/ingest/utils/SubColumnFinder.java +++ b/src/main/java/net/snowflake/ingest/utils/SubColumnFinder.java @@ -30,11 +30,13 @@ static class SubtreeInfo { /* Dot path of the node. */ final String dotPath; + final List path; - SubtreeInfo(int startTag, int endTag, String dotPath) { + SubtreeInfo(int startTag, int endTag, String dotPath, List path) { this.startTag = startTag; this.endTag = endTag; this.dotPath = dotPath; + this.path = path; } } @@ -77,6 +79,19 @@ public String getDotPath(Type.ID id) { return accessMap.get(id).dotPath; } + /** + * Get the path of a node in the schema. + * + * @param id Field ID of the node + * @return Path of the node + */ + public List getPath(Type.ID id) { + if (!accessMap.containsKey(id)) { + throw new IllegalArgumentException(String.format("Field %s not found in schema", id)); + } + return Collections.unmodifiableList(accessMap.get(id).path); + } + /** * Build the list of leaf columns in preorder traversal and the map of field id to the interval of * a node's leaf columns in the list. @@ -106,7 +121,10 @@ private void build(Type node, List currentPath) { accessMap.put( node.getId(), new SubtreeInfo( - startTag, list.size(), concatDotPath(currentPath.toArray(new String[0])))); + startTag, + list.size(), + concatDotPath(currentPath.toArray(new String[0])), + new ArrayList<>(currentPath))); } if (!currentPath.isEmpty()) { /* Remove the last element of the path at the end of recursion. */ diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParserTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParserTest.java index c496e45a8..7fe76f7b1 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParserTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParserTest.java @@ -19,8 +19,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import net.snowflake.ingest.streaming.InsertValidationResponse; import net.snowflake.ingest.utils.Pair; -import net.snowflake.ingest.utils.SFException; import net.snowflake.ingest.utils.SubColumnFinder; import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; @@ -60,7 +60,7 @@ public void parseValueBoolean() { ParquetBufferValue pv = IcebergParquetValueParser.parseColumnValueToParquet( - true, type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); + true, type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0, null); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .rowBufferStats(rowBufferStats) @@ -85,7 +85,7 @@ public void parseValueInt() { }; ParquetBufferValue pv = IcebergParquetValueParser.parseColumnValueToParquet( - Integer.MAX_VALUE, type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); + Integer.MAX_VALUE, type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0, null); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .rowBufferStats(rowBufferStats) @@ -113,7 +113,13 @@ public void parseValueDecimalToInt() { }; ParquetBufferValue pv = IcebergParquetValueParser.parseColumnValueToParquet( - new BigDecimal("12345.6789"), type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); + new BigDecimal("12345.6789"), + type, + rowBufferStatsMap, + mockSubColumnFinder, + UTC, + 0, + null); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .rowBufferStats(rowBufferStats) @@ -141,7 +147,7 @@ public void parseValueDateToInt() { }; ParquetBufferValue pv = IcebergParquetValueParser.parseColumnValueToParquet( - "2024-01-01", type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); + "2024-01-01", type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0, null); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .rowBufferStats(rowBufferStats) @@ -166,7 +172,7 @@ public void parseValueLong() { }; ParquetBufferValue pv = IcebergParquetValueParser.parseColumnValueToParquet( - Long.MAX_VALUE, type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); + Long.MAX_VALUE, type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0, null); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .rowBufferStats(rowBufferStats) @@ -199,7 +205,8 @@ public void parseValueDecimalToLong() { rowBufferStatsMap, mockSubColumnFinder, UTC, - 0); + 0, + null); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .rowBufferStats(rowBufferStats) @@ -227,7 +234,7 @@ public void parseValueTimeToLong() { }; ParquetBufferValue pv = IcebergParquetValueParser.parseColumnValueToParquet( - "12:34:56.789", type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); + "12:34:56.789", type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0, null); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .rowBufferStats(rowBufferStats) @@ -255,7 +262,13 @@ public void parseValueTimestampToLong() { }; ParquetBufferValue pv = IcebergParquetValueParser.parseColumnValueToParquet( - "2024-01-01T12:34:56.789+08:00", type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); + "2024-01-01T12:34:56.789+08:00", + type, + rowBufferStatsMap, + mockSubColumnFinder, + UTC, + 0, + null); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .rowBufferStats(rowBufferStats) @@ -283,7 +296,13 @@ public void parseValueTimestampTZToLong() { }; ParquetBufferValue pv = IcebergParquetValueParser.parseColumnValueToParquet( - "2024-01-01T12:34:56.789+08:00", type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); + "2024-01-01T12:34:56.789+08:00", + type, + rowBufferStatsMap, + mockSubColumnFinder, + UTC, + 0, + null); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .rowBufferStats(rowBufferStats) @@ -308,7 +327,7 @@ public void parseValueFloat() { }; ParquetBufferValue pv = IcebergParquetValueParser.parseColumnValueToParquet( - Float.MAX_VALUE, type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); + Float.MAX_VALUE, type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0, null); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .rowBufferStats(rowBufferStats) @@ -333,7 +352,7 @@ public void parseValueDouble() { }; ParquetBufferValue pv = IcebergParquetValueParser.parseColumnValueToParquet( - Double.MAX_VALUE, type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); + Double.MAX_VALUE, type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0, null); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .rowBufferStats(rowBufferStats) @@ -359,7 +378,7 @@ public void parseValueBinary() { byte[] value = "snowflake_to_the_moon".getBytes(); ParquetBufferValue pv = IcebergParquetValueParser.parseColumnValueToParquet( - value, type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); + value, type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0, null); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .rowBufferStats(rowBufferStats) @@ -389,7 +408,7 @@ public void parseValueStringToBinary() { String value = "snowflake_to_the_moon"; ParquetBufferValue pv = IcebergParquetValueParser.parseColumnValueToParquet( - value, type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); + value, type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0, null); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .rowBufferStats(rowBufferStats) @@ -421,7 +440,7 @@ public void parseValueFixed() { byte[] value = "snow".getBytes(); ParquetBufferValue pv = IcebergParquetValueParser.parseColumnValueToParquet( - value, type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); + value, type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0, null); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .rowBufferStats(rowBufferStats) @@ -452,7 +471,7 @@ public void parseValueDecimalToFixed() { BigDecimal value = new BigDecimal("1234567890.0123456789"); ParquetBufferValue pv = IcebergParquetValueParser.parseColumnValueToParquet( - value, type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); + value, type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0, null); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .rowBufferStats(rowBufferStats) @@ -480,10 +499,16 @@ public void parseList() throws JsonProcessingException { }; IcebergParquetValueParser.parseColumnValueToParquet( - null, list, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); + null, list, rowBufferStatsMap, mockSubColumnFinder, UTC, 0, null); ParquetBufferValue pv = IcebergParquetValueParser.parseColumnValueToParquet( - Arrays.asList(1, 2, 3, 4, 5), list, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); + Arrays.asList(1, 2, 3, 4, 5), + list, + rowBufferStatsMap, + mockSubColumnFinder, + UTC, + 0, + null); ParquetValueParserAssertionBuilder.newBuilder() .rowBufferStats(rowBufferStats) .parquetBufferValue(pv) @@ -503,14 +528,16 @@ public void parseList() throws JsonProcessingException { .element(Types.optional(PrimitiveTypeName.INT32).id(2).named("element")) .id(1) .named("LIST_COL"); - Assert.assertThrows( - SFException.class, - () -> - IcebergParquetValueParser.parseColumnValueToParquet( - null, requiredList, rowBufferStatsMap, mockSubColumnFinder, UTC, 0)); + InsertValidationResponse.InsertError error = new InsertValidationResponse.InsertError(null, 0); + IcebergParquetValueParser.parseColumnValueToParquet( + null, requiredList, rowBufferStatsMap, mockSubColumnFinder, UTC, 0, error); + Assert.assertNotNull(error.getNullValueForNotNullColNames()); + Assert.assertNull(error.getExtraColNames()); + Assert.assertNull(error.getMissingNotNullColNames()); + pv = IcebergParquetValueParser.parseColumnValueToParquet( - new ArrayList<>(), requiredList, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); + new ArrayList<>(), requiredList, rowBufferStatsMap, mockSubColumnFinder, UTC, 0, null); ParquetValueParserAssertionBuilder.newBuilder() .rowBufferStats(rowBufferStats) .parquetBufferValue(pv) @@ -524,19 +551,21 @@ public void parseList() throws JsonProcessingException { /* Test required list with required elements */ Type requiredElements = Types.requiredList() - .element(Types.required(PrimitiveTypeName.INT32).id(1).named("element")) - .id(2) + .element(Types.required(PrimitiveTypeName.INT32).id(2).named("element")) + .id(1) .named("LIST_COL"); - Assert.assertThrows( - SFException.class, - () -> - IcebergParquetValueParser.parseColumnValueToParquet( - Collections.singletonList(null), - requiredElements, - rowBufferStatsMap, - mockSubColumnFinder, - UTC, - 0)); + error = new InsertValidationResponse.InsertError(null, 0); + IcebergParquetValueParser.parseColumnValueToParquet( + Collections.singletonList(null), + requiredElements, + rowBufferStatsMap, + mockSubColumnFinder, + UTC, + 0, + error); + Assert.assertNotNull(error.getNullValueForNotNullColNames()); + Assert.assertNull(error.getExtraColNames()); + Assert.assertNull(error.getMissingNotNullColNames()); } @Test @@ -557,7 +586,7 @@ public void parseMap() throws JsonProcessingException { } }; IcebergParquetValueParser.parseColumnValueToParquet( - null, map, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); + null, map, rowBufferStatsMap, mockSubColumnFinder, UTC, 0, null); ParquetBufferValue pv = IcebergParquetValueParser.parseColumnValueToParquet( new java.util.HashMap() { @@ -570,7 +599,8 @@ public void parseMap() throws JsonProcessingException { rowBufferStatsMap, mockSubColumnFinder, UTC, - 0); + 0, + null); ParquetValueParserAssertionBuilder.newBuilder() .rowBufferStats(rowBufferKeyStats) .parquetBufferValue(pv) @@ -592,11 +622,13 @@ public void parseMap() throws JsonProcessingException { .value(Types.optional(PrimitiveTypeName.INT32).id(3).named("value")) .id(1) .named("MAP_COL"); - Assert.assertThrows( - SFException.class, - () -> - IcebergParquetValueParser.parseColumnValueToParquet( - null, requiredMap, rowBufferStatsMap, mockSubColumnFinder, UTC, 0)); + InsertValidationResponse.InsertError error = new InsertValidationResponse.InsertError(null, 0); + IcebergParquetValueParser.parseColumnValueToParquet( + null, requiredMap, rowBufferStatsMap, mockSubColumnFinder, UTC, 0, error); + Assert.assertNotNull(error.getNullValueForNotNullColNames()); + Assert.assertNull(error.getExtraColNames()); + Assert.assertNull(error.getMissingNotNullColNames()); + pv = IcebergParquetValueParser.parseColumnValueToParquet( new java.util.HashMap(), @@ -604,7 +636,8 @@ public void parseMap() throws JsonProcessingException { rowBufferStatsMap, mockSubColumnFinder, UTC, - 0); + 0, + null); ParquetValueParserAssertionBuilder.newBuilder() .rowBufferStats(rowBufferKeyStats) .parquetBufferValue(pv) @@ -622,20 +655,22 @@ public void parseMap() throws JsonProcessingException { .value(Types.required(PrimitiveTypeName.INT32).id(3).named("value")) .id(1) .named("MAP_COL"); - Assert.assertThrows( - SFException.class, - () -> - IcebergParquetValueParser.parseColumnValueToParquet( - new java.util.HashMap() { - { - put(1, null); - } - }, - requiredValues, - rowBufferStatsMap, - mockSubColumnFinder, - UTC, - 0)); + error = new InsertValidationResponse.InsertError(null, 0); + IcebergParquetValueParser.parseColumnValueToParquet( + new java.util.HashMap() { + { + put(1, null); + } + }, + requiredValues, + rowBufferStatsMap, + mockSubColumnFinder, + UTC, + 0, + error); + Assert.assertNotNull(error.getNullValueForNotNullColNames()); + Assert.assertNull(error.getExtraColNames()); + Assert.assertNull(error.getMissingNotNullColNames()); } @Test @@ -662,35 +697,41 @@ public void parseStruct() throws JsonProcessingException { }; IcebergParquetValueParser.parseColumnValueToParquet( - null, struct, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); - Assert.assertThrows( - SFException.class, - () -> - IcebergParquetValueParser.parseColumnValueToParquet( - new java.util.HashMap() { - { - put("a", 1); - } - }, - struct, - rowBufferStatsMap, - mockSubColumnFinder, - UTC, - 0)); - Assert.assertThrows( - SFException.class, - () -> - IcebergParquetValueParser.parseColumnValueToParquet( - new java.util.HashMap() { - { - put("c", 1); - } - }, - struct, - rowBufferStatsMap, - mockSubColumnFinder, - UTC, - 0)); + null, struct, rowBufferStatsMap, mockSubColumnFinder, UTC, 0, null); + InsertValidationResponse.InsertError error = new InsertValidationResponse.InsertError(null, 0); + IcebergParquetValueParser.parseColumnValueToParquet( + new java.util.HashMap() { + { + put("a", 1); + } + }, + struct, + rowBufferStatsMap, + mockSubColumnFinder, + UTC, + 0, + error); + Assert.assertNotNull(error.getMissingNotNullColNames()); + Assert.assertNull(error.getExtraColNames()); + Assert.assertNull(error.getNullValueForNotNullColNames()); + + error = new InsertValidationResponse.InsertError(null, 0); + IcebergParquetValueParser.parseColumnValueToParquet( + new java.util.HashMap() { + { + put("c", 1); + } + }, + struct, + rowBufferStatsMap, + mockSubColumnFinder, + UTC, + 0, + error); + Assert.assertNotNull(error.getMissingNotNullColNames()); + Assert.assertNotNull(error.getExtraColNames()); + Assert.assertNull(error.getNullValueForNotNullColNames()); + ParquetBufferValue pv = IcebergParquetValueParser.parseColumnValueToParquet( Collections.unmodifiableMap( @@ -704,7 +745,8 @@ public void parseStruct() throws JsonProcessingException { rowBufferStatsMap, mockSubColumnFinder, UTC, - 0); + 0, + null); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .expectedValueClass(ArrayList.class) @@ -725,11 +767,14 @@ public void parseStruct() throws JsonProcessingException { .named("b")) .id(1) .named("STRUCT_COL"); - Assert.assertThrows( - SFException.class, - () -> - IcebergParquetValueParser.parseColumnValueToParquet( - null, requiredStruct, rowBufferStatsMap, mockSubColumnFinder, UTC, 0)); + + error = new InsertValidationResponse.InsertError(null, 0); + IcebergParquetValueParser.parseColumnValueToParquet( + null, requiredStruct, rowBufferStatsMap, mockSubColumnFinder, UTC, 0, error); + Assert.assertNotNull(error.getNullValueForNotNullColNames()); + Assert.assertNull(error.getExtraColNames()); + Assert.assertNull(error.getMissingNotNullColNames()); + pv = IcebergParquetValueParser.parseColumnValueToParquet( new java.util.HashMap(), @@ -737,7 +782,8 @@ public void parseStruct() throws JsonProcessingException { rowBufferStatsMap, mockSubColumnFinder, UTC, - 0); + 0, + null); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .expectedValueClass(ArrayList.class) @@ -758,7 +804,7 @@ public void parseNestedTypes() { List reference = (List) res.getSecond(); ParquetBufferValue pv = IcebergParquetValueParser.parseColumnValueToParquet( - value, type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); + value, type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0, null); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .expectedValueClass(ArrayList.class) diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStructuredIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStructuredIT.java index 915b5512a..30cff7ac9 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStructuredIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStructuredIT.java @@ -4,12 +4,20 @@ package net.snowflake.ingest.streaming.internal.datatypes; +import static org.assertj.core.api.Assertions.assertThat; + import com.fasterxml.jackson.databind.JsonNode; import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.UUID; import net.snowflake.ingest.TestUtils; +import net.snowflake.ingest.streaming.InsertValidationResponse; +import net.snowflake.ingest.streaming.OpenChannelRequest; import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel; import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ErrorCode; @@ -236,6 +244,180 @@ public void testFieldName() throws Exception { val); } + @Test + public void testExtraFields() throws SQLException { + String tableName = createIcebergTable("object(k1 int)"); + SnowflakeStreamingIngestChannel channel = + openChannel(tableName, OpenChannelRequest.OnErrorOption.CONTINUE); + Map row = new HashMap<>(); + row.put("k2", 1); + row.put("k.3", 1); + row.put("k\\4", 1); + InsertValidationResponse insertValidationResponse = + channel.insertRow(createStreamingIngestRow(row), UUID.randomUUID().toString()); + assertThat(insertValidationResponse.getInsertErrors().size()).isEqualTo(1); + assertThat(insertValidationResponse.getInsertErrors().get(0).getExtraColNames()) + .containsOnly("VALUE.k2", "VALUE.k\\.3", "VALUE.k\\\\4"); + + tableName = createIcebergTable("map(string, array(object(k1 int)))"); + channel = openChannel(tableName, OpenChannelRequest.OnErrorOption.CONTINUE); + row.put( + "key1", + new ArrayList() { + { + add( + new HashMap() { + { + put("k2", 1); + } + }); + } + }); + insertValidationResponse = + channel.insertRow(createStreamingIngestRow(row), UUID.randomUUID().toString()); + assertThat(insertValidationResponse.getInsertErrors().size()).isEqualTo(1); + assertThat(insertValidationResponse.getInsertErrors().get(0).getExtraColNames()) + .containsOnly("VALUE.key_value.value.list.element.k2"); + } + + @Test + public void testNestedExtraFields() throws SQLException { + String tableName = + createIcebergTable( + "array(map(string, object(k1 int, k2 long, k3 string, k4 boolean, k5 double, k6" + + " fixed(10), k7 binary, k8 decimal(38, 10), k9 date, k10 time, k11 timestamp, k12" + + " timestamp_ltz)))"); + SnowflakeStreamingIngestChannel channel = + openChannel(tableName, OpenChannelRequest.OnErrorOption.CONTINUE); + List row = + Collections.singletonList( + Collections.singletonMap( + "key", + new HashMap() { + { + put("_k1", 1); + put("k1", 1); + put("_k2", 2); + put("k2", 2); + put("_k3", "3"); + put("k3", "3"); + put("_k4", true); + put("k4", true); + put("_k5", 5.0); + put("k5", 5.0); + put("_k6", "41424344454647484950"); + put("k6", "41424344454647484950"); + put("_k7", "41424344"); + put("k7", "41424344"); + put("_k8", "1234567890.1234567890"); + put("k8", "1234567890.1234567890"); + put("_k9", "2024-01-01"); + put("k9", "2024-01-01"); + put("_k10", "12:00:00"); + put("k10", "12:00:00"); + put("_k11", "2024-01-01T12:00:00.000000"); + put("k11", "2024-01-01T12:00:00.000000"); + put("_k12", "2024-01-01T12:00:00.000000+08:00"); + put("k12", "2024-01-01T12:00:00.000000+08:00"); + } + })); + InsertValidationResponse insertValidationResponse = + channel.insertRow(createStreamingIngestRow(row), UUID.randomUUID().toString()); + assertThat(insertValidationResponse.getInsertErrors().size()).isEqualTo(1); + assertThat(insertValidationResponse.getInsertErrors().get(0).getExtraColNames()) + .containsOnly( + "VALUE.list.element.key_value.value._k1", + "VALUE.list.element.key_value.value._k2", + "VALUE.list.element.key_value.value._k3", + "VALUE.list.element.key_value.value._k4", + "VALUE.list.element.key_value.value._k5", + "VALUE.list.element.key_value.value._k6", + "VALUE.list.element.key_value.value._k7", + "VALUE.list.element.key_value.value._k8", + "VALUE.list.element.key_value.value._k9", + "VALUE.list.element.key_value.value._k10", + "VALUE.list.element.key_value.value._k11", + "VALUE.list.element.key_value.value._k12"); + } + + @Test + public void testMissingFields() throws SQLException { + String tableName = createIcebergTable("object(k1 int not null, k2 int not null) not null"); + SnowflakeStreamingIngestChannel channel = + openChannel(tableName, OpenChannelRequest.OnErrorOption.CONTINUE); + + InsertValidationResponse insertValidationResponse = + channel.insertRow(createStreamingIngestRow(null), UUID.randomUUID().toString()); + assertThat(insertValidationResponse.getInsertErrors().size()).isEqualTo(1); + assertThat(insertValidationResponse.getInsertErrors().get(0).getNullValueForNotNullColNames()) + .containsOnly("VALUE"); + + insertValidationResponse = + channel.insertRow( + createStreamingIngestRow(new HashMap()), UUID.randomUUID().toString()); + assertThat(insertValidationResponse.getInsertErrors().size()).isEqualTo(1); + assertThat(insertValidationResponse.getInsertErrors().get(0).getMissingNotNullColNames()) + .containsOnly("VALUE.k1", "VALUE.k2"); + + insertValidationResponse = + channel.insertRow( + createStreamingIngestRow( + new HashMap() { + { + put("k1", null); + put("k2", null); + } + }), + UUID.randomUUID().toString()); + assertThat(insertValidationResponse.getInsertErrors().size()).isEqualTo(1); + assertThat(insertValidationResponse.getInsertErrors().get(0).getNullValueForNotNullColNames()) + .containsOnly("VALUE.k1", "VALUE.k2"); + } + + @Test + public void testMultipleErrors() throws Exception { + String tableName = + createIcebergTable( + "object(k1 int not null, k2 object(k3 int not null, k4 object(k5 int not null) not" + + " null) not null) not null"); + SnowflakeStreamingIngestChannel channel = + openChannel(tableName, OpenChannelRequest.OnErrorOption.ABORT); + + Assertions.assertThatThrownBy( + () -> + channel.insertRow( + createStreamingIngestRow( + new HashMap() { + { + put("k1", null); + put( + "k2", + new HashMap() { + { + put( + "k4", + new HashMap() { + { + put("k5", null); + put("k7", 1); + } + }); + put("k6", null); + } + }); + } + }), + UUID.randomUUID().toString())) + .isInstanceOf(SFException.class) + .hasMessage( + "The given row cannot be converted to the internal format: Invalid row 0. " + + "missingNotNullColNames=[VALUE.k2.k3], " + + "extraColNames=[VALUE.k2.k4.k7, VALUE.k2.k6], " + + "nullValueForNotNullColNames=[VALUE.k1, VALUE.k2.k4.k5]") + .extracting("vendorCode") + .isEqualTo(ErrorCode.INVALID_FORMAT_ROW.getMessageCode()); + } + private void assertStructuredDataType(String dataType, String value) throws Exception { String tableName = createIcebergTable(dataType); String offsetToken = UUID.randomUUID().toString(); @@ -255,7 +437,7 @@ private void assertStructuredDataType(String dataType, String value) throws Exce String tmp = res.getString(2); JsonNode actualNode = tmp == null ? null : objectMapper.readTree(tmp); JsonNode expectedNode = value == null ? null : objectMapper.readTree(value); - Assertions.assertThat(actualNode).isEqualTo(expectedNode); + assertThat(actualNode).isEqualTo(expectedNode); } private void assertMap(String dataType, Map value) throws Exception { @@ -274,6 +456,6 @@ private void assertMap(String dataType, Map value) throws Exception { String tmp = res.getString(2); JsonNode actualNode = tmp == null ? null : objectMapper.readTree(tmp); JsonNode expectedNode = value == null ? null : objectMapper.valueToTree(value); - Assertions.assertThat(actualNode).isEqualTo(expectedNode); + assertThat(actualNode).isEqualTo(expectedNode); } }