From d30e7feb70418e8be3c327bee4a4d23563c3b2aa Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Tue, 24 Sep 2024 16:46:32 -0700 Subject: [PATCH] Address comments --- .../internal/DataValidationUtil.java | 24 ++++--- .../internal/FileColumnProperties.java | 44 +----------- .../internal/IcebergParquetValueParser.java | 72 +++++++++++-------- .../internal/ParquetBufferValue.java | 11 +-- .../streaming/internal/ParquetRowBuffer.java | 37 +++++----- .../streaming/internal/RowBufferStats.java | 8 +++ .../net/snowflake/ingest/utils/Utils.java | 18 +++++ .../IcebergParquetValueParserTest.java | 33 ++++++--- 8 files changed, 136 insertions(+), 111 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java b/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java index 0712c68a0..d8b1cca15 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java @@ -1067,7 +1067,7 @@ static int validateAndParseBoolean(String columnName, Object input, long insertR * @param insertRowIndex Row index for error reporting * @return Object cast to Map */ - static Map validateAndParseIcebergStruct( + static Map validateAndParseIcebergStruct( String columnName, Object input, long insertRowIndex) { if (!(input instanceof Map)) { throw typeNotAllowedException( @@ -1077,14 +1077,16 @@ static Map validateAndParseIcebergStruct( new String[] {"Map"}, insertRowIndex); } - if (!((Map) input).keySet().stream().allMatch(key -> key instanceof String)) { - throw new SFException( - ErrorCode.INVALID_FORMAT_ROW, - String.format( - "Flied name of a struct must be of type String, rowIndex:%d", insertRowIndex)); + for (Object key : ((Map) input).keySet()) { + if (!(key instanceof String)) { + throw new SFException( + ErrorCode.INVALID_FORMAT_ROW, + String.format( + "Field name of a struct must be of type String, rowIndex:%d", insertRowIndex)); + } } - return (Map) input; + return (Map) input; } /** @@ -1099,13 +1101,13 @@ static Map validateAndParseIcebergStruct( * @param insertRowIndex Row index for error reporting * @return Object cast to Iterable */ - static Iterable validateAndParseIcebergList( + static Iterable validateAndParseIcebergList( String columnName, Object input, long insertRowIndex) { if (!(input instanceof Iterable)) { throw typeNotAllowedException( columnName, input.getClass(), "LIST", new String[] {"Iterable"}, insertRowIndex); } - return (Iterable) input; + return (Iterable) input; } /** @@ -1120,13 +1122,13 @@ static Iterable validateAndParseIcebergList( * @param insertRowIndex Row index for error reporting * @return Object cast to Map */ - static Map validateAndParseIcebergMap( + static Map validateAndParseIcebergMap( String columnName, Object input, long insertRowIndex) { if (!(input instanceof Map)) { throw typeNotAllowedException( columnName, input.getClass(), "MAP", new String[] {"Map"}, insertRowIndex); } - return (Map) input; + return (Map) input; } static void checkValueInRange( diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FileColumnProperties.java b/src/main/java/net/snowflake/ingest/streaming/internal/FileColumnProperties.java index ca2b8398d..c65aa5f4b 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/FileColumnProperties.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/FileColumnProperties.java @@ -5,19 +5,11 @@ package net.snowflake.ingest.streaming.internal; import static net.snowflake.ingest.streaming.internal.BinaryStringUtils.truncateBytesAsHex; -import static net.snowflake.ingest.utils.Constants.EP_NDV_UNKNOWN; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import java.math.BigInteger; import java.util.Objects; -import org.apache.parquet.column.statistics.BinaryStatistics; -import org.apache.parquet.column.statistics.BooleanStatistics; -import org.apache.parquet.column.statistics.DoubleStatistics; -import org.apache.parquet.column.statistics.FloatStatistics; -import org.apache.parquet.column.statistics.IntStatistics; -import org.apache.parquet.column.statistics.LongStatistics; -import org.apache.parquet.column.statistics.Statistics; -import org.apache.parquet.schema.LogicalTypeAnnotation; /** Audit register endpoint/FileColumnPropertyDTO property list. */ class FileColumnProperties { @@ -60,6 +52,7 @@ class FileColumnProperties { FileColumnProperties(RowBufferStats stats, boolean setDefaultValues) { this.setColumnOrdinal(stats.getOrdinal()); + this.setFieldId(stats.getFieldId()); this.setCollation(stats.getCollationDefinitionString()); this.setMaxIntValue( stats.getCurrentMaxIntValue() == null @@ -98,38 +91,6 @@ class FileColumnProperties { this.setDistinctValues(stats.getDistinctValues()); } - FileColumnProperties(int fieldId, Statistics statistics) { - this.setColumnOrdinal(fieldId); - this.setFieldId(fieldId); - this.setNullCount(statistics.getNumNulls()); - this.setDistinctValues(EP_NDV_UNKNOWN); - this.setCollation(null); - this.setMaxStrNonCollated(null); - this.setMinStrNonCollated(null); - - if (statistics instanceof BooleanStatistics) { - this.setMinIntValue( - ((BooleanStatistics) statistics).genericGetMin() ? BigInteger.ONE : BigInteger.ZERO); - this.setMaxIntValue( - ((BooleanStatistics) statistics).genericGetMax() ? BigInteger.ONE : BigInteger.ZERO); - } else if (statistics instanceof IntStatistics || statistics instanceof LongStatistics) { - this.setMinIntValue(BigInteger.valueOf(((Number) statistics.genericGetMin()).longValue())); - this.setMaxIntValue(BigInteger.valueOf(((Number) statistics.genericGetMax()).longValue())); - } else if (statistics instanceof FloatStatistics || statistics instanceof DoubleStatistics) { - this.setMinRealValue((Double) statistics.genericGetMin()); - this.setMaxRealValue((Double) statistics.genericGetMax()); - } else if (statistics instanceof BinaryStatistics) { - if (statistics.type().getLogicalTypeAnnotation() - instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) { - this.setMinIntValue(new BigInteger(statistics.getMinBytes())); - this.setMaxIntValue(new BigInteger(statistics.getMaxBytes())); - } else { - this.setMinStrValue(truncateBytesAsHex(statistics.getMinBytes(), false)); - this.setMaxStrValue(truncateBytesAsHex(statistics.getMaxBytes(), true)); - } - } - } - @JsonProperty("columnId") public int getColumnOrdinal() { return columnOrdinal; @@ -140,6 +101,7 @@ public void setColumnOrdinal(int columnOrdinal) { } @JsonProperty("fieldId") + @JsonInclude(JsonInclude.Include.NON_DEFAULT) public int getFieldId() { return fieldId; } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParser.java b/src/main/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParser.java index d722d26ad..721731a84 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParser.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParser.java @@ -5,6 +5,7 @@ package net.snowflake.ingest.streaming.internal; import static net.snowflake.ingest.streaming.internal.DataValidationUtil.checkFixedLengthByteArray; +import static net.snowflake.ingest.utils.Utils.concatDotPath; import java.math.BigDecimal; import java.math.BigInteger; @@ -17,7 +18,9 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ErrorCode; import net.snowflake.ingest.utils.SFException; @@ -36,6 +39,9 @@ /** Parses a user Iceberg column value into Parquet internal representation for buffering. */ class IcebergParquetValueParser { + static final String THREE_LEVEL_MAP_GROUP_NAME = "key_value"; + static final String THREE_LEVEL_LIST_GROUP_NAME = "list"; + /** * Parses a user column value into Parquet internal representation for buffering. * @@ -65,20 +71,23 @@ private static ParquetBufferValue parseColumnValueToParquet( ZoneId defaultTimezone, long insertRowsCurrIndex, String path, - boolean isdDescendantsOfRepeatingGroup) { - path = (path == null || path.isEmpty()) ? type.getName() : path + "." + type.getName(); + boolean isDescendantsOfRepeatingGroup) { + path = concatDotPath(path, type.getName()); float estimatedParquetSize = 0F; + + if (type.isPrimitive()) { + if (!statsMap.containsKey(path)) { + throw new SFException( + ErrorCode.INTERNAL_ERROR, String.format("Stats not found for column: %s", path)); + } + } + if (value != null) { if (type.isPrimitive()) { - if (!statsMap.containsKey(path)) { - throw new SFException( - ErrorCode.INTERNAL_ERROR, - String.format("Stats not found for column: %s", type.getName())); - } RowBufferStats stats = statsMap.get(path); estimatedParquetSize += ParquetBufferValue.DEFINITION_LEVEL_ENCODING_BYTE_LEN; estimatedParquetSize += - isdDescendantsOfRepeatingGroup + isDescendantsOfRepeatingGroup ? ParquetBufferValue.REPETITION_LEVEL_ENCODING_BYTE_LEN : 0; PrimitiveType primitiveType = type.asPrimitiveType(); @@ -148,7 +157,7 @@ private static ParquetBufferValue parseColumnValueToParquet( defaultTimezone, insertRowsCurrIndex, path, - isdDescendantsOfRepeatingGroup); + isDescendantsOfRepeatingGroup); } } @@ -158,11 +167,6 @@ private static ParquetBufferValue parseColumnValueToParquet( ErrorCode.INVALID_FORMAT_ROW, type.getName(), "Passed null to non nullable field"); } if (type.isPrimitive()) { - if (!statsMap.containsKey(path)) { - throw new SFException( - ErrorCode.INTERNAL_ERROR, - String.format("Stats not found for column: %s", type.getName())); - } statsMap.get(path).incCurrentNullCount(); } } @@ -357,7 +361,7 @@ private static int timeUnitToScale(LogicalTypeAnnotation.TimeUnit timeUnit) { * @param defaultTimezone default timezone to use for timestamp parsing * @param insertRowsCurrIndex Used for logging the row of index given in insertRows API * @param path dot path of the column - * @param isdDescendantsOfRepeatingGroup true if the column is a descendant of a repeating group, + * @param isDescendantsOfRepeatingGroup true if the column is a descendant of a repeating group, * @return list of parsed values */ private static ParquetBufferValue getGroupValue( @@ -367,7 +371,7 @@ private static ParquetBufferValue getGroupValue( ZoneId defaultTimezone, final long insertRowsCurrIndex, String path, - boolean isdDescendantsOfRepeatingGroup) { + boolean isDescendantsOfRepeatingGroup) { LogicalTypeAnnotation logicalTypeAnnotation = type.getLogicalTypeAnnotation(); if (logicalTypeAnnotation == null) { return getStructValue( @@ -377,7 +381,7 @@ private static ParquetBufferValue getGroupValue( defaultTimezone, insertRowsCurrIndex, path, - isdDescendantsOfRepeatingGroup); + isDescendantsOfRepeatingGroup); } if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.ListLogicalTypeAnnotation) { return get3LevelListValue(value, type, statsMap, defaultTimezone, insertRowsCurrIndex, path); @@ -390,7 +394,7 @@ private static ParquetBufferValue getGroupValue( defaultTimezone, insertRowsCurrIndex, path, - isdDescendantsOfRepeatingGroup); + isDescendantsOfRepeatingGroup); } throw new SFException( ErrorCode.UNKNOWN_DATA_TYPE, logicalTypeAnnotation, type.getClass().getSimpleName()); @@ -408,10 +412,11 @@ private static ParquetBufferValue getStructValue( ZoneId defaultTimezone, final long insertRowsCurrIndex, String path, - boolean isdDescendantsOfRepeatingGroup) { - Map structVal = + boolean isDescendantsOfRepeatingGroup) { + Map structVal = DataValidationUtil.validateAndParseIcebergStruct( type.getName(), value, insertRowsCurrIndex); + Set extraFields = structVal.keySet(); List listVal = new ArrayList<>(type.getFieldCount()); float estimatedParquetSize = 0f; for (int i = 0; i < type.getFieldCount(); i++) { @@ -423,10 +428,21 @@ private static ParquetBufferValue getStructValue( defaultTimezone, insertRowsCurrIndex, path, - isdDescendantsOfRepeatingGroup); + isDescendantsOfRepeatingGroup); + extraFields.remove(type.getFieldName(i)); listVal.add(parsedValue.getValue()); estimatedParquetSize += parsedValue.getSize(); } + if (!extraFields.isEmpty()) { + extraFields = + extraFields.stream().map(f -> concatDotPath(path, f)).collect(Collectors.toSet()); + throw new SFException( + ErrorCode.INVALID_FORMAT_ROW, + "Extra fields: " + extraFields, + String.format( + "Fields not present in the struct shouldn't be specified, rowIndex:%d", + insertRowsCurrIndex)); + } return new ParquetBufferValue(listVal, estimatedParquetSize); } @@ -444,7 +460,7 @@ private static ParquetBufferValue get3LevelListValue( ZoneId defaultTimezone, final long insertRowsCurrIndex, String path) { - Iterable iterableVal = + Iterable iterableVal = DataValidationUtil.validateAndParseIcebergList(type.getName(), value, insertRowsCurrIndex); List listVal = new ArrayList<>(); final AtomicReference estimatedParquetSize = new AtomicReference<>(0f); @@ -457,7 +473,7 @@ private static ParquetBufferValue get3LevelListValue( statsMap, defaultTimezone, insertRowsCurrIndex, - path, + concatDotPath(path, THREE_LEVEL_LIST_GROUP_NAME), true); listVal.add(Collections.singletonList(parsedValue.getValue())); estimatedParquetSize.updateAndGet(sz -> sz + parsedValue.getSize()); @@ -479,8 +495,8 @@ private static ParquetBufferValue get3LevelMapValue( ZoneId defaultTimezone, final long insertRowsCurrIndex, String path, - boolean isdDescendantsOfRepeatingGroup) { - Map mapVal = + boolean isDescendantsOfRepeatingGroup) { + Map mapVal = DataValidationUtil.validateAndParseIcebergMap(type.getName(), value, insertRowsCurrIndex); List listVal = new ArrayList<>(); final AtomicReference estimatedParquetSize = new AtomicReference<>(0f); @@ -493,7 +509,7 @@ private static ParquetBufferValue get3LevelMapValue( statsMap, defaultTimezone, insertRowsCurrIndex, - path, + concatDotPath(path, THREE_LEVEL_MAP_GROUP_NAME), true); ParquetBufferValue parsedValue = parseColumnValueToParquet( @@ -502,8 +518,8 @@ private static ParquetBufferValue get3LevelMapValue( statsMap, defaultTimezone, insertRowsCurrIndex, - path, - isdDescendantsOfRepeatingGroup); + concatDotPath(path, THREE_LEVEL_MAP_GROUP_NAME), + isDescendantsOfRepeatingGroup); listVal.add(Arrays.asList(parsedKey.getValue(), parsedValue.getValue())); estimatedParquetSize.updateAndGet(sz -> sz + parsedKey.getSize() + parsedValue.getSize()); }); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetBufferValue.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetBufferValue.java index 4602b0c9a..48987bd74 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetBufferValue.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetBufferValue.java @@ -9,14 +9,13 @@ class ParquetBufferValue { // Parquet uses BitPacking to encode boolean, hence 1 bit per value public static final float BIT_ENCODING_BYTE_LEN = 1.0f / 8; - public static final float REPETITION_LEVEL_ENCODING_BYTE_LEN = 2.0f / 8; - /** - * On average parquet needs 2 bytes / 8 values for the RLE+bitpack encoded definition level. + * On average parquet needs 2 bytes / 8 values for the RLE+bitpack encoded definition and + * repetition level. * *
    - * There are two cases how definition level (0 for null values, 1 for non-null values) is - * encoded: + * There are two cases how definition and repetition level (0 for null values, 1 for non-null + * values) is encoded: *
  • If there are at least 8 repeated values in a row, they are run-length encoded (length + * value itself). E.g. 11111111 -> 8 1 *
  • If there are less than 8 repeated values, they are written in group as part of a @@ -33,6 +32,8 @@ class ParquetBufferValue { */ public static final float DEFINITION_LEVEL_ENCODING_BYTE_LEN = 2.0f / 8; + public static final float REPETITION_LEVEL_ENCODING_BYTE_LEN = 2.0f / 8; + // Parquet stores length in 4 bytes before the actual data bytes public static final int BYTE_ARRAY_LENGTH_ENCODING_BYTE_LEN = 4; private final Object value; diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java index 8132a3798..ede0a9000 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java @@ -4,6 +4,8 @@ package net.snowflake.ingest.streaming.internal; +import static net.snowflake.ingest.utils.Utils.concatDotPath; + import java.math.BigDecimal; import java.math.BigInteger; import java.nio.charset.StandardCharsets; @@ -95,13 +97,15 @@ public void setupSchema(List columns) { if (!clientBufferParameters.getIsIcebergMode()) { this.statsMap.put( column.getInternalName(), - new RowBufferStats(column.getName(), column.getCollation(), column.getOrdinal(), 0)); + new RowBufferStats( + column.getName(), column.getCollation(), column.getOrdinal(), 0 /* fieldId */)); if (onErrorOption == OpenChannelRequest.OnErrorOption.ABORT || onErrorOption == OpenChannelRequest.OnErrorOption.SKIP_BATCH) { this.tempStatsMap.put( column.getInternalName(), - new RowBufferStats(column.getName(), column.getCollation(), column.getOrdinal(), 0)); + new RowBufferStats( + column.getName(), column.getCollation(), column.getOrdinal(), 0 /* fieldId */)); } } @@ -109,26 +113,27 @@ public void setupSchema(List columns) { } schema = new MessageType(PARQUET_MESSAGE_TYPE_NAME, parquetTypes); if (clientBufferParameters.getIsIcebergMode()) { - int ordinal = 0; - String prevParentColumnName = ""; - for (ColumnDescriptor columnDescriptor : schema.getColumns()) { - String parentColumnName = columnDescriptor.getPath()[0]; - if (!parentColumnName.equals(prevParentColumnName)) { - ordinal++; - prevParentColumnName = parentColumnName; - } - String subColumnName = String.join(".", columnDescriptor.getPath()); + for (ColumnDescriptor subColumnDescriptor : schema.getColumns()) { + String subColumnName = concatDotPath(subColumnDescriptor.getPath()); + + /* set fieldId to 0 for non-structured columns */ int fieldId = - parentColumnName.equals(subColumnName) + subColumnDescriptor.getPath().length == 1 ? 0 - : columnDescriptor.getPrimitiveType().getId().intValue(); - RowBufferStats stats = new RowBufferStats(subColumnName, null, ordinal, fieldId); - this.statsMap.put(subColumnName, stats); + : subColumnDescriptor.getPrimitiveType().getId().intValue(); + int ordinal = schema.getType(subColumnDescriptor.getPath()[0]).getId().intValue(); + + this.statsMap.put( + subColumnName, + new RowBufferStats( + subColumnName, null /* collationDefinitionString */, ordinal, fieldId)); if (onErrorOption == OpenChannelRequest.OnErrorOption.ABORT || onErrorOption == OpenChannelRequest.OnErrorOption.SKIP_BATCH) { this.tempStatsMap.put( - subColumnName, new RowBufferStats(subColumnName, null, ordinal, fieldId)); + subColumnName, + new RowBufferStats( + subColumnName, null /* collationDefinitionString */, ordinal, fieldId)); } } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/RowBufferStats.java b/src/main/java/net/snowflake/ingest/streaming/internal/RowBufferStats.java index 5a1aa33b4..48f7b47d9 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/RowBufferStats.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/RowBufferStats.java @@ -15,8 +15,16 @@ /** Keeps track of the active EP stats, used to generate a file EP info */ class RowBufferStats { + /* Ordinal of a column, one-based. */ private final int ordinal; + + /* + * Field id of a column. + * For FDN columns, it's always 0. + * For Iceberg columns, set to nonzero Iceberg field id if it's a sub-column, otherwise zero. + */ private final int fieldId; + private byte[] currentMinStrValue; private byte[] currentMaxStrValue; private BigInteger currentMinIntValue; diff --git a/src/main/java/net/snowflake/ingest/utils/Utils.java b/src/main/java/net/snowflake/ingest/utils/Utils.java index 5220625da..e01e3d491 100644 --- a/src/main/java/net/snowflake/ingest/utils/Utils.java +++ b/src/main/java/net/snowflake/ingest/utils/Utils.java @@ -411,4 +411,22 @@ public static String getFullyQualifiedChannelName( String dbName, String schemaName, String tableName, String channelName) { return String.format("%s.%s.%s.%s", dbName, schemaName, tableName, channelName); } + + /* + * Get concat dot path, ignore null or empty string + * + * @param path the path + */ + public static String concatDotPath(String... path) { + StringBuilder sb = new StringBuilder(); + for (String p : path) { + if (!isNullOrEmpty(p)) { + if (sb.length() > 0) { + sb.append("."); + } + sb.append(p); + } + } + return sb.toString(); + } } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParserTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParserTest.java index 6ecf63960..2ffe18cc0 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParserTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParserTest.java @@ -433,11 +433,11 @@ public void parseList() throws JsonProcessingException { Types.optionalList() .element(Types.optional(PrimitiveTypeName.INT32).named("element")) .named("LIST_COL"); - RowBufferStats rowBufferStats = new RowBufferStats("LIST_COL.element"); + RowBufferStats rowBufferStats = new RowBufferStats("LIST_COL.list.element"); Map rowBufferStatsMap = new HashMap() { { - put("LIST_COL.element", rowBufferStats); + put("LIST_COL.list.element", rowBufferStats); } }; @@ -500,13 +500,13 @@ public void parseMap() throws JsonProcessingException { .key(Types.required(PrimitiveTypeName.INT32).named("key")) .value(Types.optional(PrimitiveTypeName.INT32).named("value")) .named("MAP_COL"); - RowBufferStats rowBufferKeyStats = new RowBufferStats("MAP_COL.key"); - RowBufferStats rowBufferValueStats = new RowBufferStats("MAP_COL.value"); + RowBufferStats rowBufferKeyStats = new RowBufferStats("MAP_COL.key_value.key"); + RowBufferStats rowBufferValueStats = new RowBufferStats("MAP_COL.key_value.value"); Map rowBufferStatsMap = new HashMap() { { - put("MAP_COL.key", rowBufferKeyStats); - put("MAP_COL.value", rowBufferValueStats); + put("MAP_COL.key_value.key", rowBufferKeyStats); + put("MAP_COL.key_value.value", rowBufferValueStats); } }; IcebergParquetValueParser.parseColumnValueToParquet(null, map, rowBufferStatsMap, UTC, 0); @@ -615,13 +615,25 @@ public void parseStruct() throws JsonProcessingException { rowBufferStatsMap, UTC, 0)); + Assert.assertThrows( + SFException.class, + () -> + IcebergParquetValueParser.parseColumnValueToParquet( + new java.util.HashMap() { + { + put("c", 1); + } + }, + struct, + rowBufferStatsMap, + UTC, + 0)); ParquetBufferValue pv = IcebergParquetValueParser.parseColumnValueToParquet( new java.util.HashMap() { { // a is null put("b", "2"); - put("c", 1); // Ignored } }, struct, @@ -697,18 +709,19 @@ private static Type generateNestedTypeAndStats( return Types.optionalList() .element( generateNestedTypeAndStats( - depth - 1, "element", rowBufferStatsMap, path + ".element")) + depth - 1, "element", rowBufferStatsMap, path + ".list.element")) .named(name); case 2: return Types.optionalGroup() .addField(generateNestedTypeAndStats(depth - 1, "a", rowBufferStatsMap, path + ".a")) .named(name); case 0: - rowBufferStatsMap.put(path + ".key", new RowBufferStats(path + ".key")); + rowBufferStatsMap.put(path + ".key_value.key", new RowBufferStats(path + ".key_value.key")); return Types.optionalMap() .key(Types.required(PrimitiveTypeName.INT32).named("key")) .value( - generateNestedTypeAndStats(depth - 1, "value", rowBufferStatsMap, path + ".value")) + generateNestedTypeAndStats( + depth - 1, "value", rowBufferStatsMap, path + ".key_value.value")) .named(name); } return null;