diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java b/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java index d8b1cca15..8d8bff3f5 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/DataValidationUtil.java @@ -1082,7 +1082,8 @@ static int validateAndParseBoolean(String columnName, Object input, long insertR throw new SFException( ErrorCode.INVALID_FORMAT_ROW, String.format( - "Field name of a struct must be of type String, rowIndex:%d", insertRowIndex)); + "Field name of struct %s must be of type String, rowIndex:%d", + columnName, insertRowIndex)); } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParser.java b/src/main/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParser.java index 71af27366..963dbf188 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParser.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParser.java @@ -21,7 +21,6 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.ErrorCode; @@ -396,14 +395,7 @@ private static ParquetBufferValue getGroupValue( return get3LevelListValue(value, type, statsMap, defaultTimezone, insertRowsCurrIndex, path); } if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.MapLogicalTypeAnnotation) { - return get3LevelMapValue( - value, - type, - statsMap, - defaultTimezone, - insertRowsCurrIndex, - path, - isDescendantsOfRepeatingGroup); + return get3LevelMapValue(value, type, statsMap, defaultTimezone, insertRowsCurrIndex, path); } throw new SFException( ErrorCode.UNKNOWN_DATA_TYPE, logicalTypeAnnotation, type.getClass().getSimpleName()); @@ -442,11 +434,11 @@ private static ParquetBufferValue getStructValue( estimatedParquetSize += parsedValue.getSize(); } if (!extraFields.isEmpty()) { - extraFields = - extraFields.stream().map(f -> concatDotPath(path, f)).collect(Collectors.toSet()); + String extraFieldsStr = + extraFields.stream().map(f -> concatDotPath(path, f)).collect(Collectors.joining(", ")); throw new SFException( ErrorCode.INVALID_FORMAT_ROW, - "Extra fields: " + extraFields, + "Extra fields: " + extraFieldsStr, String.format( "Fields not present in the struct shouldn't be specified, rowIndex:%d", insertRowsCurrIndex)); @@ -471,22 +463,22 @@ private static ParquetBufferValue get3LevelListValue( Iterable iterableVal = DataValidationUtil.validateAndParseIcebergList(path, value, insertRowsCurrIndex); List listVal = new ArrayList<>(); - final AtomicReference estimatedParquetSize = new AtomicReference<>(0f); - iterableVal.forEach( - element -> { - ParquetBufferValue parsedValue = - parseColumnValueToParquet( - element, - type.getType(0).asGroupType().getType(0), - statsMap, - defaultTimezone, - insertRowsCurrIndex, - concatDotPath(path, THREE_LEVEL_LIST_GROUP_NAME), - true); - listVal.add(Collections.singletonList(parsedValue.getValue())); - estimatedParquetSize.updateAndGet(sz -> sz + parsedValue.getSize()); - }); - return new ParquetBufferValue(listVal, estimatedParquetSize.get()); + float estimatedParquetSize = 0; + String listGroupPath = concatDotPath(path, THREE_LEVEL_LIST_GROUP_NAME); + for (Object val : iterableVal) { + ParquetBufferValue parsedValue = + parseColumnValueToParquet( + val, + type.getType(0).asGroupType().getType(0), + statsMap, + defaultTimezone, + insertRowsCurrIndex, + listGroupPath, + true); + listVal.add(Collections.singletonList(parsedValue.getValue())); + estimatedParquetSize += parsedValue.getSize(); + } + return new ParquetBufferValue(listVal, estimatedParquetSize); } /** @@ -502,35 +494,34 @@ private static ParquetBufferValue get3LevelMapValue( Map statsMap, ZoneId defaultTimezone, final long insertRowsCurrIndex, - String path, - boolean isDescendantsOfRepeatingGroup) { + String path) { Map mapVal = DataValidationUtil.validateAndParseIcebergMap(path, value, insertRowsCurrIndex); List listVal = new ArrayList<>(); - final AtomicReference estimatedParquetSize = new AtomicReference<>(0f); - mapVal.forEach( - (k, v) -> { - ParquetBufferValue parsedKey = - parseColumnValueToParquet( - k, - type.getType(0).asGroupType().getType(0), - statsMap, - defaultTimezone, - insertRowsCurrIndex, - concatDotPath(path, THREE_LEVEL_MAP_GROUP_NAME), - true); - ParquetBufferValue parsedValue = - parseColumnValueToParquet( - v, - type.getType(0).asGroupType().getType(1), - statsMap, - defaultTimezone, - insertRowsCurrIndex, - concatDotPath(path, THREE_LEVEL_MAP_GROUP_NAME), - isDescendantsOfRepeatingGroup); - listVal.add(Arrays.asList(parsedKey.getValue(), parsedValue.getValue())); - estimatedParquetSize.updateAndGet(sz -> sz + parsedKey.getSize() + parsedValue.getSize()); - }); - return new ParquetBufferValue(listVal, estimatedParquetSize.get()); + float estimatedParquetSize = 0; + String mapGroupPath = concatDotPath(path, THREE_LEVEL_MAP_GROUP_NAME); + for (Map.Entry entry : mapVal.entrySet()) { + ParquetBufferValue parsedKey = + parseColumnValueToParquet( + entry.getKey(), + type.getType(0).asGroupType().getType(0), + statsMap, + defaultTimezone, + insertRowsCurrIndex, + mapGroupPath, + true); + ParquetBufferValue parsedValue = + parseColumnValueToParquet( + entry.getValue(), + type.getType(0).asGroupType().getType(1), + statsMap, + defaultTimezone, + insertRowsCurrIndex, + mapGroupPath, + true); + listVal.add(Arrays.asList(parsedKey.getValue(), parsedValue.getValue())); + estimatedParquetSize += parsedKey.getSize() + parsedValue.getSize(); + } + return new ParquetBufferValue(listVal, estimatedParquetSize); } } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParserTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParserTest.java index 2ffe18cc0..007dc3e23 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParserTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParserTest.java @@ -530,7 +530,8 @@ public void parseMap() throws JsonProcessingException { convertToArrayList(objectMapper.readValue("[[1, 1], [2, 2]]", ArrayList.class))) .expectedSize( (4.0f + REPETITION_LEVEL_ENCODING_BYTE_LEN + DEFINITION_LEVEL_ENCODING_BYTE_LEN) * 2 - + (4.0f + DEFINITION_LEVEL_ENCODING_BYTE_LEN) * 2) + + (4.0f + REPETITION_LEVEL_ENCODING_BYTE_LEN + DEFINITION_LEVEL_ENCODING_BYTE_LEN) + * 2) .expectedMin(BigInteger.valueOf(1)) .expectedMax(BigInteger.valueOf(2)) .assertMatches(); @@ -630,12 +631,13 @@ public void parseStruct() throws JsonProcessingException { 0)); ParquetBufferValue pv = IcebergParquetValueParser.parseColumnValueToParquet( - new java.util.HashMap() { - { - // a is null - put("b", "2"); - } - }, + Collections.unmodifiableMap( + new java.util.HashMap() { + { + // a is null + put("b", "2"); + } + }), struct, rowBufferStatsMap, UTC,