From 70943f7c8f689aedaadc7e380d1f12af606c06a5 Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Fri, 8 Nov 2024 13:57:17 -0800 Subject: [PATCH] SNOW-1787322 Fix InsertError for structured data type (#888) Currently the `InsertError` doesn't populate extra columns, missing columns, null value for non null columns in `InsertValidationResponse` when ingesting structured data type to Iceberg tables. The PR is fixing this. We use parquet dot path with escaping dot and back slash character in field name to represent sub-columns. For example, column `x` in `map_col(string, object("a.b" array(object("c\d" object(x int)))))` has a path `MAP_COL.key_value.value.a\.b.list.element.c\\d.x` --- .../streaming/InsertValidationResponse.java | 39 ++- .../streaming/internal/AbstractRowBuffer.java | 28 ++- .../internal/IcebergParquetValueParser.java | 110 +++++--- .../streaming/internal/ParquetRowBuffer.java | 31 ++- .../ingest/utils/SubColumnFinder.java | 22 +- .../IcebergParquetValueParserTest.java | 236 +++++++++++------- .../datatypes/IcebergStructuredIT.java | 186 +++++++++++++- 7 files changed, 500 insertions(+), 152 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/InsertValidationResponse.java b/src/main/java/net/snowflake/ingest/streaming/InsertValidationResponse.java index 31a9c416f..f3ccae363 100644 --- a/src/main/java/net/snowflake/ingest/streaming/InsertValidationResponse.java +++ b/src/main/java/net/snowflake/ingest/streaming/InsertValidationResponse.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 Snowflake Computing Inc. All rights reserved. + * Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved. */ package net.snowflake.ingest.streaming; @@ -115,6 +115,18 @@ public void setExtraColNames(List extraColNames) { this.extraColNames = extraColNames; } + /** + * Add an extra column name in the input row compared with the table schema + * + * @param extraColName the extra column name + */ + public void addExtraColName(String extraColName) { + if (extraColNames == null) { + extraColNames = new ArrayList<>(); + } + extraColNames.add(extraColName); + } + /** Get the list of extra column names in the input row compared with the table schema */ public List getExtraColNames() { return extraColNames; @@ -125,6 +137,18 @@ public void setMissingNotNullColNames(List missingNotNullColNames) { this.missingNotNullColNames = missingNotNullColNames; } + /** + * Add a missing non-nullable column name in the input row compared with the table schema + * + * @param missingNotNullColName the missing non-nullable column name + */ + public void addMissingNotNullColName(String missingNotNullColName) { + if (missingNotNullColNames == null) { + missingNotNullColNames = new ArrayList<>(); + } + missingNotNullColNames.add(missingNotNullColName); + } + /** * Get the list of missing non-nullable column names in the input row compared with the table * schema @@ -141,6 +165,19 @@ public void setNullValueForNotNullColNames(List nullValueForNotNullColNa this.nullValueForNotNullColNames = nullValueForNotNullColNames; } + /** + * Add a name of non-nullable column which have null value in the input row compared with the + * table schema + * + * @param nullValueForNotNullColName the name of non-nullable column which have null value + */ + public void addNullValueForNotNullColName(String nullValueForNotNullColName) { + if (nullValueForNotNullColNames == null) { + nullValueForNotNullColNames = new ArrayList<>(); + } + nullValueForNotNullColNames.add(nullValueForNotNullColName); + } + /** * Get the list of names of non-nullable column which have null value in the input row compared * with the table schema diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java index 69558d5d9..00f79ad0c 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/AbstractRowBuffer.java @@ -162,7 +162,12 @@ public InsertValidationResponse insertRows( Set inputColumnNames = verifyInputColumns(row, error, rowIndex); rowsSizeInBytes += addRow( - row, rowBuffer.bufferedRowCount, rowBuffer.statsMap, inputColumnNames, rowIndex); + row, + rowBuffer.bufferedRowCount, + rowBuffer.statsMap, + inputColumnNames, + rowIndex, + error); rowBuffer.bufferedRowCount++; } catch (SFException e) { error.setException(e); @@ -200,7 +205,13 @@ public InsertValidationResponse insertRows( for (Map row : rows) { Set inputColumnNames = verifyInputColumns(row, null, tempRowCount); tempRowsSizeInBytes += - addTempRow(row, tempRowCount, rowBuffer.tempStatsMap, inputColumnNames, tempRowCount); + addTempRow( + row, + tempRowCount, + rowBuffer.tempStatsMap, + inputColumnNames, + tempRowCount, + new InsertValidationResponse.InsertError(row, 0) /* dummy error */); tempRowCount++; if ((long) rowBuffer.bufferedRowCount + tempRowCount >= Integer.MAX_VALUE) { throw new SFException(ErrorCode.INTERNAL_ERROR, "Row count reaches MAX value"); @@ -249,7 +260,8 @@ public InsertValidationResponse insertRows( try { Set inputColumnNames = verifyInputColumns(row, error, rowIndex); tempRowsSizeInBytes += - addTempRow(row, tempRowCount, rowBuffer.tempStatsMap, inputColumnNames, rowIndex); + addTempRow( + row, tempRowCount, rowBuffer.tempStatsMap, inputColumnNames, rowIndex, error); tempRowCount++; } catch (SFException e) { error.setException(e); @@ -575,6 +587,8 @@ public ChannelData flush() { * @param formattedInputColumnNames list of input column names after formatting * @param insertRowIndex Index of the rows given in insertRows API. Not the same as * bufferedRowIndex + * @param error Insert error object, used to populate error details when doing structured data + * type parsing * @return row size */ abstract float addRow( @@ -582,7 +596,8 @@ abstract float addRow( int bufferedRowIndex, Map statsMap, Set formattedInputColumnNames, - final long insertRowIndex); + final long insertRowIndex, + InsertValidationResponse.InsertError error); /** * Add an input row to the temporary row buffer. @@ -595,6 +610,8 @@ abstract float addRow( * @param statsMap column stats map * @param formattedInputColumnNames list of input column names after formatting * @param insertRowIndex index of the row being inserteed from User Input List + * @param error Insert error object, used to populate error details when doing structured data + * type parsing * @return row size */ abstract float addTempRow( @@ -602,7 +619,8 @@ abstract float addTempRow( int curRowIndex, Map statsMap, Set formattedInputColumnNames, - long insertRowIndex); + long insertRowIndex, + InsertValidationResponse.InsertError error); /** Move rows from the temporary buffer to the current row buffer. */ abstract void moveTempRowsToActualBuffer(int tempRowCount); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParser.java b/src/main/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParser.java index 3ad2f02eb..7a7ae5eab 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParser.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParser.java @@ -5,6 +5,7 @@ package net.snowflake.ingest.streaming.internal; import static net.snowflake.ingest.streaming.internal.DataValidationUtil.checkFixedLengthByteArray; +import static net.snowflake.ingest.utils.Utils.concatDotPath; import java.math.BigDecimal; import java.math.BigInteger; @@ -19,7 +20,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.stream.Collectors; +import net.snowflake.ingest.streaming.InsertValidationResponse; import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.SFException; @@ -49,6 +50,7 @@ class IcebergParquetValueParser { * @param defaultTimezone default timezone to use for timestamp parsing * @param insertRowsCurrIndex Row index corresponding the row to parse (w.r.t input rows in * insertRows API, and not buffered row) + * @param error InsertError object to add errors * @return parsed value and byte size of Parquet internal representation */ static ParquetBufferValue parseColumnValueToParquet( @@ -57,10 +59,18 @@ static ParquetBufferValue parseColumnValueToParquet( Map statsMap, SubColumnFinder subColumnFinder, ZoneId defaultTimezone, - long insertRowsCurrIndex) { + long insertRowsCurrIndex, + InsertValidationResponse.InsertError error) { Utils.assertNotNull("Parquet column stats map", statsMap); return parseColumnValueToParquet( - value, type, statsMap, subColumnFinder, defaultTimezone, insertRowsCurrIndex, false); + value, + type, + statsMap, + subColumnFinder, + defaultTimezone, + insertRowsCurrIndex, + false /* isDescendantsOfRepeatingGroup */, + error); } private static ParquetBufferValue parseColumnValueToParquet( @@ -70,7 +80,8 @@ private static ParquetBufferValue parseColumnValueToParquet( SubColumnFinder subColumnFinder, ZoneId defaultTimezone, long insertRowsCurrIndex, - boolean isDescendantsOfRepeatingGroup) { + boolean isDescendantsOfRepeatingGroup, + InsertValidationResponse.InsertError error) { float estimatedParquetSize = 0F; if (type.getId() == null) { @@ -86,8 +97,8 @@ private static ParquetBufferValue parseColumnValueToParquet( } } + String path = subColumnFinder.getDotPath(id); if (value != null) { - String path = subColumnFinder.getDotPath(id); if (type.isPrimitive()) { RowBufferStats stats = statsMap.get(id.toString()); estimatedParquetSize += ParquetBufferValue.DEFINITION_LEVEL_ENCODING_BYTE_LEN; @@ -160,18 +171,14 @@ private static ParquetBufferValue parseColumnValueToParquet( subColumnFinder, defaultTimezone, insertRowsCurrIndex, - isDescendantsOfRepeatingGroup); + isDescendantsOfRepeatingGroup, + error); } } if (value == null) { if (type.isRepetition(Repetition.REQUIRED)) { - throw new SFException( - ErrorCode.INVALID_FORMAT_ROW, - subColumnFinder.getDotPath(id), - String.format( - "Passed null to non nullable field, rowIndex:%d, column:%s", - insertRowsCurrIndex, subColumnFinder.getDotPath(id))); + error.addNullValueForNotNullColName(path); } subColumnFinder .getSubColumns(id) @@ -381,6 +388,7 @@ private static int timeUnitToScale(LogicalTypeAnnotation.TimeUnit timeUnit) { * @param defaultTimezone default timezone to use for timestamp parsing * @param insertRowsCurrIndex Used for logging the row of index given in insertRows API * @param isDescendantsOfRepeatingGroup true if the column is a descendant of a repeating group, + * @param error InsertError object to add errors * @return list of parsed values */ private static ParquetBufferValue getGroupValue( @@ -390,7 +398,8 @@ private static ParquetBufferValue getGroupValue( SubColumnFinder subColumnFinder, ZoneId defaultTimezone, final long insertRowsCurrIndex, - boolean isDescendantsOfRepeatingGroup) { + boolean isDescendantsOfRepeatingGroup, + InsertValidationResponse.InsertError error) { LogicalTypeAnnotation logicalTypeAnnotation = type.getLogicalTypeAnnotation(); if (logicalTypeAnnotation == null) { return getStructValue( @@ -400,15 +409,16 @@ private static ParquetBufferValue getGroupValue( subColumnFinder, defaultTimezone, insertRowsCurrIndex, - isDescendantsOfRepeatingGroup); + isDescendantsOfRepeatingGroup, + error); } if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.ListLogicalTypeAnnotation) { return get3LevelListValue( - value, type, statsMap, subColumnFinder, defaultTimezone, insertRowsCurrIndex); + value, type, statsMap, subColumnFinder, defaultTimezone, insertRowsCurrIndex, error); } if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.MapLogicalTypeAnnotation) { return get3LevelMapValue( - value, type, statsMap, subColumnFinder, defaultTimezone, insertRowsCurrIndex); + value, type, statsMap, subColumnFinder, defaultTimezone, insertRowsCurrIndex, error); } throw new SFException( ErrorCode.UNKNOWN_DATA_TYPE, @@ -429,35 +439,48 @@ private static ParquetBufferValue getStructValue( SubColumnFinder subColumnFinder, ZoneId defaultTimezone, final long insertRowsCurrIndex, - boolean isDescendantsOfRepeatingGroup) { + boolean isDescendantsOfRepeatingGroup, + InsertValidationResponse.InsertError error) { Map structVal = DataValidationUtil.validateAndParseIcebergStruct( subColumnFinder.getDotPath(type.getId()), value, insertRowsCurrIndex); Set extraFields = new HashSet<>(structVal.keySet()); + List missingFields = new ArrayList<>(); List listVal = new ArrayList<>(type.getFieldCount()); float estimatedParquetSize = 0f; for (int i = 0; i < type.getFieldCount(); i++) { - ParquetBufferValue parsedValue = - parseColumnValueToParquet( - structVal.getOrDefault(type.getFieldName(i), null), - type.getType(i), - statsMap, - subColumnFinder, - defaultTimezone, - insertRowsCurrIndex, - isDescendantsOfRepeatingGroup); extraFields.remove(type.getFieldName(i)); - listVal.add(parsedValue.getValue()); - estimatedParquetSize += parsedValue.getSize(); + if (structVal.containsKey(type.getFieldName(i))) { + ParquetBufferValue parsedValue = + parseColumnValueToParquet( + structVal.get(type.getFieldName(i)), + type.getType(i), + statsMap, + subColumnFinder, + defaultTimezone, + insertRowsCurrIndex, + isDescendantsOfRepeatingGroup, + error); + listVal.add(parsedValue.getValue()); + estimatedParquetSize += parsedValue.getSize(); + } else { + if (type.getType(i).isRepetition(Repetition.REQUIRED)) { + missingFields.add(type.getFieldName(i)); + } else { + listVal.add(null); + } + } } - if (!extraFields.isEmpty()) { - String extraFieldsStr = extraFields.stream().collect(Collectors.joining(", ", "[", "]")); - throw new SFException( - ErrorCode.INVALID_FORMAT_ROW, - "Extra fields: " + extraFieldsStr, - String.format( - "Fields not present in the struct %s shouldn't be specified, rowIndex:%d", - subColumnFinder.getDotPath(type.getId()), insertRowsCurrIndex)); + + for (String missingField : missingFields) { + List missingFieldPath = new ArrayList<>(subColumnFinder.getPath(type.getId())); + missingFieldPath.add(missingField); + error.addMissingNotNullColName(concatDotPath(missingFieldPath.toArray(new String[0]))); + } + for (String extraField : extraFields) { + List extraFieldPath = new ArrayList<>(subColumnFinder.getPath(type.getId())); + extraFieldPath.add(extraField); + error.addExtraColName(concatDotPath(extraFieldPath.toArray(new String[0]))); } return new ParquetBufferValue(listVal, estimatedParquetSize); } @@ -475,7 +498,8 @@ private static ParquetBufferValue get3LevelListValue( Map statsMap, SubColumnFinder subColumnFinder, ZoneId defaultTimezone, - final long insertRowsCurrIndex) { + final long insertRowsCurrIndex, + InsertValidationResponse.InsertError error) { Iterable iterableVal = DataValidationUtil.validateAndParseIcebergList( subColumnFinder.getDotPath(type.getId()), value, insertRowsCurrIndex); @@ -490,7 +514,8 @@ private static ParquetBufferValue get3LevelListValue( subColumnFinder, defaultTimezone, insertRowsCurrIndex, - true); + true /* isDecedentOfRepeatingGroup */, + error); listVal.add(Collections.singletonList(parsedValue.getValue())); estimatedParquetSize += parsedValue.getSize(); } @@ -515,7 +540,8 @@ private static ParquetBufferValue get3LevelMapValue( Map statsMap, SubColumnFinder subColumnFinder, ZoneId defaultTimezone, - final long insertRowsCurrIndex) { + final long insertRowsCurrIndex, + InsertValidationResponse.InsertError error) { Map mapVal = DataValidationUtil.validateAndParseIcebergMap( subColumnFinder.getDotPath(type.getId()), value, insertRowsCurrIndex); @@ -530,7 +556,8 @@ private static ParquetBufferValue get3LevelMapValue( subColumnFinder, defaultTimezone, insertRowsCurrIndex, - true); + true /* isDecedentOfRepeatingGroup */, + error); ParquetBufferValue parsedValue = parseColumnValueToParquet( entry.getValue(), @@ -539,7 +566,8 @@ private static ParquetBufferValue get3LevelMapValue( subColumnFinder, defaultTimezone, insertRowsCurrIndex, - true); + true /* isDecedentOfRepeatingGroup */, + error); listVal.add(Arrays.asList(parsedKey.getValue(), parsedValue.getValue())); estimatedParquetSize += parsedKey.getSize() + parsedValue.getSize(); } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java index f5654d07b..96b261491 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java @@ -22,6 +22,7 @@ import net.snowflake.client.jdbc.internal.google.common.collect.Sets; import net.snowflake.ingest.connection.RequestBuilder; import net.snowflake.ingest.connection.TelemetryService; +import net.snowflake.ingest.streaming.InsertValidationResponse; import net.snowflake.ingest.streaming.OffsetTokenVerificationFunction; import net.snowflake.ingest.streaming.OpenChannelRequest; import net.snowflake.ingest.utils.Constants; @@ -275,8 +276,9 @@ float addRow( int bufferedRowIndex, Map statsMap, Set formattedInputColumnNames, - final long insertRowIndex) { - return addRow(row, this::writeRow, statsMap, formattedInputColumnNames, insertRowIndex); + final long insertRowIndex, + InsertValidationResponse.InsertError error) { + return addRow(row, this::writeRow, statsMap, formattedInputColumnNames, insertRowIndex, error); } void writeRow(List row) { @@ -289,8 +291,9 @@ float addTempRow( int curRowIndex, Map statsMap, Set formattedInputColumnNames, - long insertRowIndex) { - return addRow(row, tempData::add, statsMap, formattedInputColumnNames, insertRowIndex); + long insertRowIndex, + InsertValidationResponse.InsertError error) { + return addRow(row, tempData::add, statsMap, formattedInputColumnNames, insertRowIndex, error); } /** @@ -303,6 +306,7 @@ float addTempRow( * @param insertRowsCurrIndex Row index of the input Rows passed in {@link * net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel#insertRows(Iterable, * String)} + * @param error insertion error object to populate if there is an error * @return row size */ private float addRow( @@ -310,7 +314,8 @@ private float addRow( Consumer> out, Map statsMap, Set inputColumnNames, - long insertRowsCurrIndex) { + long insertRowsCurrIndex, + InsertValidationResponse.InsertError error) { Object[] indexedRow = new Object[fieldIndex.size()]; float size = 0F; @@ -333,7 +338,8 @@ private float addRow( forkedStatsMap, subColumnFinder, defaultTimezone, - insertRowsCurrIndex) + insertRowsCurrIndex, + error) : SnowflakeParquetValueParser.parseColumnValueToParquet( value, column, @@ -346,6 +352,19 @@ private float addRow( size += valueWithSize.getSize(); } + if (error.getMissingNotNullColNames() != null + || error.getExtraColNames() != null + || error.getNullValueForNotNullColNames() != null) { + throw new SFException( + ErrorCode.INVALID_FORMAT_ROW, + String.format("Invalid row %d", insertRowsCurrIndex), + String.format( + "missingNotNullColNames=%s, extraColNames=%s, nullValueForNotNullColNames=%s", + error.getMissingNotNullColNames(), + error.getExtraColNames(), + error.getNullValueForNotNullColNames())); + } + long rowSizeRoundedUp = Double.valueOf(Math.ceil(size)).longValue(); if (rowSizeRoundedUp > clientBufferParameters.getMaxAllowedRowSizeInBytes()) { diff --git a/src/main/java/net/snowflake/ingest/utils/SubColumnFinder.java b/src/main/java/net/snowflake/ingest/utils/SubColumnFinder.java index 7e52c7eb6..7293d13bc 100644 --- a/src/main/java/net/snowflake/ingest/utils/SubColumnFinder.java +++ b/src/main/java/net/snowflake/ingest/utils/SubColumnFinder.java @@ -30,11 +30,13 @@ static class SubtreeInfo { /* Dot path of the node. */ final String dotPath; + final List path; - SubtreeInfo(int startTag, int endTag, String dotPath) { + SubtreeInfo(int startTag, int endTag, String dotPath, List path) { this.startTag = startTag; this.endTag = endTag; this.dotPath = dotPath; + this.path = path; } } @@ -77,6 +79,19 @@ public String getDotPath(Type.ID id) { return accessMap.get(id).dotPath; } + /** + * Get the path of a node in the schema. + * + * @param id Field ID of the node + * @return Path of the node + */ + public List getPath(Type.ID id) { + if (!accessMap.containsKey(id)) { + throw new IllegalArgumentException(String.format("Field %s not found in schema", id)); + } + return Collections.unmodifiableList(accessMap.get(id).path); + } + /** * Build the list of leaf columns in preorder traversal and the map of field id to the interval of * a node's leaf columns in the list. @@ -106,7 +121,10 @@ private void build(Type node, List currentPath) { accessMap.put( node.getId(), new SubtreeInfo( - startTag, list.size(), concatDotPath(currentPath.toArray(new String[0])))); + startTag, + list.size(), + concatDotPath(currentPath.toArray(new String[0])), + new ArrayList<>(currentPath))); } if (!currentPath.isEmpty()) { /* Remove the last element of the path at the end of recursion. */ diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParserTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParserTest.java index c496e45a8..7fe76f7b1 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParserTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParserTest.java @@ -19,8 +19,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import net.snowflake.ingest.streaming.InsertValidationResponse; import net.snowflake.ingest.utils.Pair; -import net.snowflake.ingest.utils.SFException; import net.snowflake.ingest.utils.SubColumnFinder; import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; @@ -60,7 +60,7 @@ public void parseValueBoolean() { ParquetBufferValue pv = IcebergParquetValueParser.parseColumnValueToParquet( - true, type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); + true, type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0, null); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .rowBufferStats(rowBufferStats) @@ -85,7 +85,7 @@ public void parseValueInt() { }; ParquetBufferValue pv = IcebergParquetValueParser.parseColumnValueToParquet( - Integer.MAX_VALUE, type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); + Integer.MAX_VALUE, type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0, null); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .rowBufferStats(rowBufferStats) @@ -113,7 +113,13 @@ public void parseValueDecimalToInt() { }; ParquetBufferValue pv = IcebergParquetValueParser.parseColumnValueToParquet( - new BigDecimal("12345.6789"), type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); + new BigDecimal("12345.6789"), + type, + rowBufferStatsMap, + mockSubColumnFinder, + UTC, + 0, + null); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .rowBufferStats(rowBufferStats) @@ -141,7 +147,7 @@ public void parseValueDateToInt() { }; ParquetBufferValue pv = IcebergParquetValueParser.parseColumnValueToParquet( - "2024-01-01", type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); + "2024-01-01", type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0, null); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .rowBufferStats(rowBufferStats) @@ -166,7 +172,7 @@ public void parseValueLong() { }; ParquetBufferValue pv = IcebergParquetValueParser.parseColumnValueToParquet( - Long.MAX_VALUE, type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); + Long.MAX_VALUE, type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0, null); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .rowBufferStats(rowBufferStats) @@ -199,7 +205,8 @@ public void parseValueDecimalToLong() { rowBufferStatsMap, mockSubColumnFinder, UTC, - 0); + 0, + null); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .rowBufferStats(rowBufferStats) @@ -227,7 +234,7 @@ public void parseValueTimeToLong() { }; ParquetBufferValue pv = IcebergParquetValueParser.parseColumnValueToParquet( - "12:34:56.789", type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); + "12:34:56.789", type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0, null); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .rowBufferStats(rowBufferStats) @@ -255,7 +262,13 @@ public void parseValueTimestampToLong() { }; ParquetBufferValue pv = IcebergParquetValueParser.parseColumnValueToParquet( - "2024-01-01T12:34:56.789+08:00", type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); + "2024-01-01T12:34:56.789+08:00", + type, + rowBufferStatsMap, + mockSubColumnFinder, + UTC, + 0, + null); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .rowBufferStats(rowBufferStats) @@ -283,7 +296,13 @@ public void parseValueTimestampTZToLong() { }; ParquetBufferValue pv = IcebergParquetValueParser.parseColumnValueToParquet( - "2024-01-01T12:34:56.789+08:00", type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); + "2024-01-01T12:34:56.789+08:00", + type, + rowBufferStatsMap, + mockSubColumnFinder, + UTC, + 0, + null); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .rowBufferStats(rowBufferStats) @@ -308,7 +327,7 @@ public void parseValueFloat() { }; ParquetBufferValue pv = IcebergParquetValueParser.parseColumnValueToParquet( - Float.MAX_VALUE, type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); + Float.MAX_VALUE, type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0, null); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .rowBufferStats(rowBufferStats) @@ -333,7 +352,7 @@ public void parseValueDouble() { }; ParquetBufferValue pv = IcebergParquetValueParser.parseColumnValueToParquet( - Double.MAX_VALUE, type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); + Double.MAX_VALUE, type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0, null); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .rowBufferStats(rowBufferStats) @@ -359,7 +378,7 @@ public void parseValueBinary() { byte[] value = "snowflake_to_the_moon".getBytes(); ParquetBufferValue pv = IcebergParquetValueParser.parseColumnValueToParquet( - value, type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); + value, type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0, null); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .rowBufferStats(rowBufferStats) @@ -389,7 +408,7 @@ public void parseValueStringToBinary() { String value = "snowflake_to_the_moon"; ParquetBufferValue pv = IcebergParquetValueParser.parseColumnValueToParquet( - value, type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); + value, type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0, null); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .rowBufferStats(rowBufferStats) @@ -421,7 +440,7 @@ public void parseValueFixed() { byte[] value = "snow".getBytes(); ParquetBufferValue pv = IcebergParquetValueParser.parseColumnValueToParquet( - value, type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); + value, type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0, null); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .rowBufferStats(rowBufferStats) @@ -452,7 +471,7 @@ public void parseValueDecimalToFixed() { BigDecimal value = new BigDecimal("1234567890.0123456789"); ParquetBufferValue pv = IcebergParquetValueParser.parseColumnValueToParquet( - value, type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); + value, type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0, null); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .rowBufferStats(rowBufferStats) @@ -480,10 +499,16 @@ public void parseList() throws JsonProcessingException { }; IcebergParquetValueParser.parseColumnValueToParquet( - null, list, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); + null, list, rowBufferStatsMap, mockSubColumnFinder, UTC, 0, null); ParquetBufferValue pv = IcebergParquetValueParser.parseColumnValueToParquet( - Arrays.asList(1, 2, 3, 4, 5), list, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); + Arrays.asList(1, 2, 3, 4, 5), + list, + rowBufferStatsMap, + mockSubColumnFinder, + UTC, + 0, + null); ParquetValueParserAssertionBuilder.newBuilder() .rowBufferStats(rowBufferStats) .parquetBufferValue(pv) @@ -503,14 +528,16 @@ public void parseList() throws JsonProcessingException { .element(Types.optional(PrimitiveTypeName.INT32).id(2).named("element")) .id(1) .named("LIST_COL"); - Assert.assertThrows( - SFException.class, - () -> - IcebergParquetValueParser.parseColumnValueToParquet( - null, requiredList, rowBufferStatsMap, mockSubColumnFinder, UTC, 0)); + InsertValidationResponse.InsertError error = new InsertValidationResponse.InsertError(null, 0); + IcebergParquetValueParser.parseColumnValueToParquet( + null, requiredList, rowBufferStatsMap, mockSubColumnFinder, UTC, 0, error); + Assert.assertNotNull(error.getNullValueForNotNullColNames()); + Assert.assertNull(error.getExtraColNames()); + Assert.assertNull(error.getMissingNotNullColNames()); + pv = IcebergParquetValueParser.parseColumnValueToParquet( - new ArrayList<>(), requiredList, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); + new ArrayList<>(), requiredList, rowBufferStatsMap, mockSubColumnFinder, UTC, 0, null); ParquetValueParserAssertionBuilder.newBuilder() .rowBufferStats(rowBufferStats) .parquetBufferValue(pv) @@ -524,19 +551,21 @@ public void parseList() throws JsonProcessingException { /* Test required list with required elements */ Type requiredElements = Types.requiredList() - .element(Types.required(PrimitiveTypeName.INT32).id(1).named("element")) - .id(2) + .element(Types.required(PrimitiveTypeName.INT32).id(2).named("element")) + .id(1) .named("LIST_COL"); - Assert.assertThrows( - SFException.class, - () -> - IcebergParquetValueParser.parseColumnValueToParquet( - Collections.singletonList(null), - requiredElements, - rowBufferStatsMap, - mockSubColumnFinder, - UTC, - 0)); + error = new InsertValidationResponse.InsertError(null, 0); + IcebergParquetValueParser.parseColumnValueToParquet( + Collections.singletonList(null), + requiredElements, + rowBufferStatsMap, + mockSubColumnFinder, + UTC, + 0, + error); + Assert.assertNotNull(error.getNullValueForNotNullColNames()); + Assert.assertNull(error.getExtraColNames()); + Assert.assertNull(error.getMissingNotNullColNames()); } @Test @@ -557,7 +586,7 @@ public void parseMap() throws JsonProcessingException { } }; IcebergParquetValueParser.parseColumnValueToParquet( - null, map, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); + null, map, rowBufferStatsMap, mockSubColumnFinder, UTC, 0, null); ParquetBufferValue pv = IcebergParquetValueParser.parseColumnValueToParquet( new java.util.HashMap() { @@ -570,7 +599,8 @@ public void parseMap() throws JsonProcessingException { rowBufferStatsMap, mockSubColumnFinder, UTC, - 0); + 0, + null); ParquetValueParserAssertionBuilder.newBuilder() .rowBufferStats(rowBufferKeyStats) .parquetBufferValue(pv) @@ -592,11 +622,13 @@ public void parseMap() throws JsonProcessingException { .value(Types.optional(PrimitiveTypeName.INT32).id(3).named("value")) .id(1) .named("MAP_COL"); - Assert.assertThrows( - SFException.class, - () -> - IcebergParquetValueParser.parseColumnValueToParquet( - null, requiredMap, rowBufferStatsMap, mockSubColumnFinder, UTC, 0)); + InsertValidationResponse.InsertError error = new InsertValidationResponse.InsertError(null, 0); + IcebergParquetValueParser.parseColumnValueToParquet( + null, requiredMap, rowBufferStatsMap, mockSubColumnFinder, UTC, 0, error); + Assert.assertNotNull(error.getNullValueForNotNullColNames()); + Assert.assertNull(error.getExtraColNames()); + Assert.assertNull(error.getMissingNotNullColNames()); + pv = IcebergParquetValueParser.parseColumnValueToParquet( new java.util.HashMap(), @@ -604,7 +636,8 @@ public void parseMap() throws JsonProcessingException { rowBufferStatsMap, mockSubColumnFinder, UTC, - 0); + 0, + null); ParquetValueParserAssertionBuilder.newBuilder() .rowBufferStats(rowBufferKeyStats) .parquetBufferValue(pv) @@ -622,20 +655,22 @@ public void parseMap() throws JsonProcessingException { .value(Types.required(PrimitiveTypeName.INT32).id(3).named("value")) .id(1) .named("MAP_COL"); - Assert.assertThrows( - SFException.class, - () -> - IcebergParquetValueParser.parseColumnValueToParquet( - new java.util.HashMap() { - { - put(1, null); - } - }, - requiredValues, - rowBufferStatsMap, - mockSubColumnFinder, - UTC, - 0)); + error = new InsertValidationResponse.InsertError(null, 0); + IcebergParquetValueParser.parseColumnValueToParquet( + new java.util.HashMap() { + { + put(1, null); + } + }, + requiredValues, + rowBufferStatsMap, + mockSubColumnFinder, + UTC, + 0, + error); + Assert.assertNotNull(error.getNullValueForNotNullColNames()); + Assert.assertNull(error.getExtraColNames()); + Assert.assertNull(error.getMissingNotNullColNames()); } @Test @@ -662,35 +697,41 @@ public void parseStruct() throws JsonProcessingException { }; IcebergParquetValueParser.parseColumnValueToParquet( - null, struct, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); - Assert.assertThrows( - SFException.class, - () -> - IcebergParquetValueParser.parseColumnValueToParquet( - new java.util.HashMap() { - { - put("a", 1); - } - }, - struct, - rowBufferStatsMap, - mockSubColumnFinder, - UTC, - 0)); - Assert.assertThrows( - SFException.class, - () -> - IcebergParquetValueParser.parseColumnValueToParquet( - new java.util.HashMap() { - { - put("c", 1); - } - }, - struct, - rowBufferStatsMap, - mockSubColumnFinder, - UTC, - 0)); + null, struct, rowBufferStatsMap, mockSubColumnFinder, UTC, 0, null); + InsertValidationResponse.InsertError error = new InsertValidationResponse.InsertError(null, 0); + IcebergParquetValueParser.parseColumnValueToParquet( + new java.util.HashMap() { + { + put("a", 1); + } + }, + struct, + rowBufferStatsMap, + mockSubColumnFinder, + UTC, + 0, + error); + Assert.assertNotNull(error.getMissingNotNullColNames()); + Assert.assertNull(error.getExtraColNames()); + Assert.assertNull(error.getNullValueForNotNullColNames()); + + error = new InsertValidationResponse.InsertError(null, 0); + IcebergParquetValueParser.parseColumnValueToParquet( + new java.util.HashMap() { + { + put("c", 1); + } + }, + struct, + rowBufferStatsMap, + mockSubColumnFinder, + UTC, + 0, + error); + Assert.assertNotNull(error.getMissingNotNullColNames()); + Assert.assertNotNull(error.getExtraColNames()); + Assert.assertNull(error.getNullValueForNotNullColNames()); + ParquetBufferValue pv = IcebergParquetValueParser.parseColumnValueToParquet( Collections.unmodifiableMap( @@ -704,7 +745,8 @@ public void parseStruct() throws JsonProcessingException { rowBufferStatsMap, mockSubColumnFinder, UTC, - 0); + 0, + null); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .expectedValueClass(ArrayList.class) @@ -725,11 +767,14 @@ public void parseStruct() throws JsonProcessingException { .named("b")) .id(1) .named("STRUCT_COL"); - Assert.assertThrows( - SFException.class, - () -> - IcebergParquetValueParser.parseColumnValueToParquet( - null, requiredStruct, rowBufferStatsMap, mockSubColumnFinder, UTC, 0)); + + error = new InsertValidationResponse.InsertError(null, 0); + IcebergParquetValueParser.parseColumnValueToParquet( + null, requiredStruct, rowBufferStatsMap, mockSubColumnFinder, UTC, 0, error); + Assert.assertNotNull(error.getNullValueForNotNullColNames()); + Assert.assertNull(error.getExtraColNames()); + Assert.assertNull(error.getMissingNotNullColNames()); + pv = IcebergParquetValueParser.parseColumnValueToParquet( new java.util.HashMap(), @@ -737,7 +782,8 @@ public void parseStruct() throws JsonProcessingException { rowBufferStatsMap, mockSubColumnFinder, UTC, - 0); + 0, + null); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .expectedValueClass(ArrayList.class) @@ -758,7 +804,7 @@ public void parseNestedTypes() { List reference = (List) res.getSecond(); ParquetBufferValue pv = IcebergParquetValueParser.parseColumnValueToParquet( - value, type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0); + value, type, rowBufferStatsMap, mockSubColumnFinder, UTC, 0, null); ParquetValueParserAssertionBuilder.newBuilder() .parquetBufferValue(pv) .expectedValueClass(ArrayList.class) diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStructuredIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStructuredIT.java index 915b5512a..30cff7ac9 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStructuredIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStructuredIT.java @@ -4,12 +4,20 @@ package net.snowflake.ingest.streaming.internal.datatypes; +import static org.assertj.core.api.Assertions.assertThat; + import com.fasterxml.jackson.databind.JsonNode; import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.UUID; import net.snowflake.ingest.TestUtils; +import net.snowflake.ingest.streaming.InsertValidationResponse; +import net.snowflake.ingest.streaming.OpenChannelRequest; import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel; import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ErrorCode; @@ -236,6 +244,180 @@ public void testFieldName() throws Exception { val); } + @Test + public void testExtraFields() throws SQLException { + String tableName = createIcebergTable("object(k1 int)"); + SnowflakeStreamingIngestChannel channel = + openChannel(tableName, OpenChannelRequest.OnErrorOption.CONTINUE); + Map row = new HashMap<>(); + row.put("k2", 1); + row.put("k.3", 1); + row.put("k\\4", 1); + InsertValidationResponse insertValidationResponse = + channel.insertRow(createStreamingIngestRow(row), UUID.randomUUID().toString()); + assertThat(insertValidationResponse.getInsertErrors().size()).isEqualTo(1); + assertThat(insertValidationResponse.getInsertErrors().get(0).getExtraColNames()) + .containsOnly("VALUE.k2", "VALUE.k\\.3", "VALUE.k\\\\4"); + + tableName = createIcebergTable("map(string, array(object(k1 int)))"); + channel = openChannel(tableName, OpenChannelRequest.OnErrorOption.CONTINUE); + row.put( + "key1", + new ArrayList() { + { + add( + new HashMap() { + { + put("k2", 1); + } + }); + } + }); + insertValidationResponse = + channel.insertRow(createStreamingIngestRow(row), UUID.randomUUID().toString()); + assertThat(insertValidationResponse.getInsertErrors().size()).isEqualTo(1); + assertThat(insertValidationResponse.getInsertErrors().get(0).getExtraColNames()) + .containsOnly("VALUE.key_value.value.list.element.k2"); + } + + @Test + public void testNestedExtraFields() throws SQLException { + String tableName = + createIcebergTable( + "array(map(string, object(k1 int, k2 long, k3 string, k4 boolean, k5 double, k6" + + " fixed(10), k7 binary, k8 decimal(38, 10), k9 date, k10 time, k11 timestamp, k12" + + " timestamp_ltz)))"); + SnowflakeStreamingIngestChannel channel = + openChannel(tableName, OpenChannelRequest.OnErrorOption.CONTINUE); + List row = + Collections.singletonList( + Collections.singletonMap( + "key", + new HashMap() { + { + put("_k1", 1); + put("k1", 1); + put("_k2", 2); + put("k2", 2); + put("_k3", "3"); + put("k3", "3"); + put("_k4", true); + put("k4", true); + put("_k5", 5.0); + put("k5", 5.0); + put("_k6", "41424344454647484950"); + put("k6", "41424344454647484950"); + put("_k7", "41424344"); + put("k7", "41424344"); + put("_k8", "1234567890.1234567890"); + put("k8", "1234567890.1234567890"); + put("_k9", "2024-01-01"); + put("k9", "2024-01-01"); + put("_k10", "12:00:00"); + put("k10", "12:00:00"); + put("_k11", "2024-01-01T12:00:00.000000"); + put("k11", "2024-01-01T12:00:00.000000"); + put("_k12", "2024-01-01T12:00:00.000000+08:00"); + put("k12", "2024-01-01T12:00:00.000000+08:00"); + } + })); + InsertValidationResponse insertValidationResponse = + channel.insertRow(createStreamingIngestRow(row), UUID.randomUUID().toString()); + assertThat(insertValidationResponse.getInsertErrors().size()).isEqualTo(1); + assertThat(insertValidationResponse.getInsertErrors().get(0).getExtraColNames()) + .containsOnly( + "VALUE.list.element.key_value.value._k1", + "VALUE.list.element.key_value.value._k2", + "VALUE.list.element.key_value.value._k3", + "VALUE.list.element.key_value.value._k4", + "VALUE.list.element.key_value.value._k5", + "VALUE.list.element.key_value.value._k6", + "VALUE.list.element.key_value.value._k7", + "VALUE.list.element.key_value.value._k8", + "VALUE.list.element.key_value.value._k9", + "VALUE.list.element.key_value.value._k10", + "VALUE.list.element.key_value.value._k11", + "VALUE.list.element.key_value.value._k12"); + } + + @Test + public void testMissingFields() throws SQLException { + String tableName = createIcebergTable("object(k1 int not null, k2 int not null) not null"); + SnowflakeStreamingIngestChannel channel = + openChannel(tableName, OpenChannelRequest.OnErrorOption.CONTINUE); + + InsertValidationResponse insertValidationResponse = + channel.insertRow(createStreamingIngestRow(null), UUID.randomUUID().toString()); + assertThat(insertValidationResponse.getInsertErrors().size()).isEqualTo(1); + assertThat(insertValidationResponse.getInsertErrors().get(0).getNullValueForNotNullColNames()) + .containsOnly("VALUE"); + + insertValidationResponse = + channel.insertRow( + createStreamingIngestRow(new HashMap()), UUID.randomUUID().toString()); + assertThat(insertValidationResponse.getInsertErrors().size()).isEqualTo(1); + assertThat(insertValidationResponse.getInsertErrors().get(0).getMissingNotNullColNames()) + .containsOnly("VALUE.k1", "VALUE.k2"); + + insertValidationResponse = + channel.insertRow( + createStreamingIngestRow( + new HashMap() { + { + put("k1", null); + put("k2", null); + } + }), + UUID.randomUUID().toString()); + assertThat(insertValidationResponse.getInsertErrors().size()).isEqualTo(1); + assertThat(insertValidationResponse.getInsertErrors().get(0).getNullValueForNotNullColNames()) + .containsOnly("VALUE.k1", "VALUE.k2"); + } + + @Test + public void testMultipleErrors() throws Exception { + String tableName = + createIcebergTable( + "object(k1 int not null, k2 object(k3 int not null, k4 object(k5 int not null) not" + + " null) not null) not null"); + SnowflakeStreamingIngestChannel channel = + openChannel(tableName, OpenChannelRequest.OnErrorOption.ABORT); + + Assertions.assertThatThrownBy( + () -> + channel.insertRow( + createStreamingIngestRow( + new HashMap() { + { + put("k1", null); + put( + "k2", + new HashMap() { + { + put( + "k4", + new HashMap() { + { + put("k5", null); + put("k7", 1); + } + }); + put("k6", null); + } + }); + } + }), + UUID.randomUUID().toString())) + .isInstanceOf(SFException.class) + .hasMessage( + "The given row cannot be converted to the internal format: Invalid row 0. " + + "missingNotNullColNames=[VALUE.k2.k3], " + + "extraColNames=[VALUE.k2.k4.k7, VALUE.k2.k6], " + + "nullValueForNotNullColNames=[VALUE.k1, VALUE.k2.k4.k5]") + .extracting("vendorCode") + .isEqualTo(ErrorCode.INVALID_FORMAT_ROW.getMessageCode()); + } + private void assertStructuredDataType(String dataType, String value) throws Exception { String tableName = createIcebergTable(dataType); String offsetToken = UUID.randomUUID().toString(); @@ -255,7 +437,7 @@ private void assertStructuredDataType(String dataType, String value) throws Exce String tmp = res.getString(2); JsonNode actualNode = tmp == null ? null : objectMapper.readTree(tmp); JsonNode expectedNode = value == null ? null : objectMapper.readTree(value); - Assertions.assertThat(actualNode).isEqualTo(expectedNode); + assertThat(actualNode).isEqualTo(expectedNode); } private void assertMap(String dataType, Map value) throws Exception { @@ -274,6 +456,6 @@ private void assertMap(String dataType, Map value) throws Exception { String tmp = res.getString(2); JsonNode actualNode = tmp == null ? null : objectMapper.readTree(tmp); JsonNode expectedNode = value == null ? null : objectMapper.valueToTree(value); - Assertions.assertThat(actualNode).isEqualTo(expectedNode); + assertThat(actualNode).isEqualTo(expectedNode); } }