From ba48f45835dd097bf0d12f27fb3964d488d20357 Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Mon, 4 Nov 2024 13:58:05 -0800 Subject: [PATCH] done --- .../internal/IcebergParquetValueParser.java | 75 +-- .../streaming/internal/ParquetRowBuffer.java | 11 +- .../ingest/utils/IcebergDataTypeParser.java | 82 ++- .../ingest/utils/SubColumnFinder.java | 49 +- .../net/snowflake/ingest/utils/Utils.java | 9 +- .../IcebergParquetValueParserTest.java | 109 ++-- .../streaming/internal/RowBufferTest.java | 598 ++++++++++++++---- .../datatypes/AbstractDataTypeTest.java | 11 +- .../internal/datatypes/IcebergDateTimeIT.java | 4 + .../datatypes/IcebergStructuredIT.java | 119 +++- .../ingest/utils/SubColumnFinderTest.java | 50 +- 11 files changed, 844 insertions(+), 273 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParser.java b/src/main/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParser.java index 98c2182c7..2a982aa4f 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParser.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParser.java @@ -5,8 +5,6 @@ package net.snowflake.ingest.streaming.internal; import static net.snowflake.ingest.streaming.internal.DataValidationUtil.checkFixedLengthByteArray; -import static net.snowflake.ingest.utils.Utils.concatDotPath; -import static net.snowflake.ingest.utils.Utils.isNullOrEmpty; import java.math.BigDecimal; import java.math.BigInteger; @@ -41,9 +39,6 @@ /** Parses a user Iceberg column value into Parquet internal representation for buffering. */ class IcebergParquetValueParser { - static final String THREE_LEVEL_MAP_GROUP_NAME = "key_value"; - static final String THREE_LEVEL_LIST_GROUP_NAME = "list"; - /** * Parses a user column value into Parquet internal representation for buffering. * @@ -65,7 +60,7 @@ static ParquetBufferValue parseColumnValueToParquet( long insertRowsCurrIndex) { Utils.assertNotNull("Parquet column stats map", statsMap); return parseColumnValueToParquet( - value, type, statsMap, subColumnFinder, defaultTimezone, insertRowsCurrIndex, null, false); + value, type, statsMap, subColumnFinder, defaultTimezone, insertRowsCurrIndex, false); } private static ParquetBufferValue parseColumnValueToParquet( @@ -75,21 +70,27 @@ private static ParquetBufferValue parseColumnValueToParquet( SubColumnFinder subColumnFinder, ZoneId defaultTimezone, long insertRowsCurrIndex, - String path, boolean isDescendantsOfRepeatingGroup) { - path = isNullOrEmpty(path) ? type.getName() : concatDotPath(path, type.getName()); float estimatedParquetSize = 0F; + if (type.getId() == null) { + throw new SFException( + ErrorCode.INTERNAL_ERROR, String.format("Id not found for field: %s", type.getName())); + } + String id = type.getId().toString(); + if (type.isPrimitive()) { - if (!statsMap.containsKey(path)) { + if (!statsMap.containsKey(id)) { throw new SFException( - ErrorCode.INTERNAL_ERROR, String.format("Stats not found for column: %s", path)); + ErrorCode.INTERNAL_ERROR, + String.format("Stats not found for fieldId: %s", type.getId())); } } if (value != null) { + String path = subColumnFinder.getDotPath(id); if (type.isPrimitive()) { - RowBufferStats stats = statsMap.get(path); + RowBufferStats stats = statsMap.get(id); estimatedParquetSize += ParquetBufferValue.DEFINITION_LEVEL_ENCODING_BYTE_LEN; estimatedParquetSize += isDescendantsOfRepeatingGroup @@ -160,7 +161,6 @@ private static ParquetBufferValue parseColumnValueToParquet( subColumnFinder, defaultTimezone, insertRowsCurrIndex, - path, isDescendantsOfRepeatingGroup); } } @@ -169,13 +169,13 @@ private static ParquetBufferValue parseColumnValueToParquet( if (type.isRepetition(Repetition.REQUIRED)) { throw new SFException( ErrorCode.INVALID_FORMAT_ROW, - path, + subColumnFinder.getDotPath(id), String.format( "Passed null to non nullable field, rowIndex:%d, column:%s", - insertRowsCurrIndex, path)); + insertRowsCurrIndex, subColumnFinder.getDotPath(id))); } subColumnFinder - .getSubColumns(path) + .getSubColumns(id) .forEach(subColumn -> statsMap.get(subColumn).incCurrentNullCount()); } @@ -381,7 +381,6 @@ private static int timeUnitToScale(LogicalTypeAnnotation.TimeUnit timeUnit) { * @param subColumnFinder helper class to find stats of sub-columns * @param defaultTimezone default timezone to use for timestamp parsing * @param insertRowsCurrIndex Used for logging the row of index given in insertRows API - * @param path dot path of the column * @param isDescendantsOfRepeatingGroup true if the column is a descendant of a repeating group, * @return list of parsed values */ @@ -392,7 +391,6 @@ private static ParquetBufferValue getGroupValue( SubColumnFinder subColumnFinder, ZoneId defaultTimezone, final long insertRowsCurrIndex, - String path, boolean isDescendantsOfRepeatingGroup) { LogicalTypeAnnotation logicalTypeAnnotation = type.getLogicalTypeAnnotation(); if (logicalTypeAnnotation == null) { @@ -403,19 +401,21 @@ private static ParquetBufferValue getGroupValue( subColumnFinder, defaultTimezone, insertRowsCurrIndex, - path, isDescendantsOfRepeatingGroup); } if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.ListLogicalTypeAnnotation) { return get3LevelListValue( - value, type, statsMap, subColumnFinder, defaultTimezone, insertRowsCurrIndex, path); + value, type, statsMap, subColumnFinder, defaultTimezone, insertRowsCurrIndex); } if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.MapLogicalTypeAnnotation) { return get3LevelMapValue( - value, type, statsMap, subColumnFinder, defaultTimezone, insertRowsCurrIndex, path); + value, type, statsMap, subColumnFinder, defaultTimezone, insertRowsCurrIndex); } throw new SFException( - ErrorCode.UNKNOWN_DATA_TYPE, path, logicalTypeAnnotation, type.getClass().getSimpleName()); + ErrorCode.UNKNOWN_DATA_TYPE, + subColumnFinder.getDotPath(type.getId().toString()), + logicalTypeAnnotation, + type.getClass().getSimpleName()); } /** @@ -430,10 +430,10 @@ private static ParquetBufferValue getStructValue( SubColumnFinder subColumnFinder, ZoneId defaultTimezone, final long insertRowsCurrIndex, - String path, boolean isDescendantsOfRepeatingGroup) { Map structVal = - DataValidationUtil.validateAndParseIcebergStruct(path, value, insertRowsCurrIndex); + DataValidationUtil.validateAndParseIcebergStruct( + subColumnFinder.getDotPath(type.getId().toString()), value, insertRowsCurrIndex); Set extraFields = new HashSet<>(structVal.keySet()); List listVal = new ArrayList<>(type.getFieldCount()); float estimatedParquetSize = 0f; @@ -446,21 +446,19 @@ private static ParquetBufferValue getStructValue( subColumnFinder, defaultTimezone, insertRowsCurrIndex, - path, isDescendantsOfRepeatingGroup); extraFields.remove(type.getFieldName(i)); listVal.add(parsedValue.getValue()); estimatedParquetSize += parsedValue.getSize(); } if (!extraFields.isEmpty()) { - String extraFieldsStr = - extraFields.stream().map(f -> concatDotPath(path, f)).collect(Collectors.joining(", ")); + String extraFieldsStr = extraFields.stream().collect(Collectors.joining(", ", "[", "]")); throw new SFException( ErrorCode.INVALID_FORMAT_ROW, "Extra fields: " + extraFieldsStr, String.format( - "Fields not present in the struct shouldn't be specified, rowIndex:%d", - insertRowsCurrIndex)); + "Fields not present in the struct %s shouldn't be specified, rowIndex:%d", + subColumnFinder.getDotPath(type.getId().toString()), insertRowsCurrIndex)); } return new ParquetBufferValue(listVal, estimatedParquetSize); } @@ -478,13 +476,12 @@ private static ParquetBufferValue get3LevelListValue( Map statsMap, SubColumnFinder subColumnFinder, ZoneId defaultTimezone, - final long insertRowsCurrIndex, - String path) { + final long insertRowsCurrIndex) { Iterable iterableVal = - DataValidationUtil.validateAndParseIcebergList(path, value, insertRowsCurrIndex); + DataValidationUtil.validateAndParseIcebergList( + subColumnFinder.getDotPath(type.getId().toString()), value, insertRowsCurrIndex); List listVal = new ArrayList<>(); float estimatedParquetSize = 0; - String listGroupPath = concatDotPath(path, THREE_LEVEL_LIST_GROUP_NAME); for (Object val : iterableVal) { ParquetBufferValue parsedValue = parseColumnValueToParquet( @@ -494,14 +491,13 @@ private static ParquetBufferValue get3LevelListValue( subColumnFinder, defaultTimezone, insertRowsCurrIndex, - listGroupPath, true); listVal.add(Collections.singletonList(parsedValue.getValue())); estimatedParquetSize += parsedValue.getSize(); } if (listVal.isEmpty()) { subColumnFinder - .getSubColumns(path) + .getSubColumns(type.getId().toString()) .forEach(subColumn -> statsMap.get(subColumn).incCurrentNullCount()); } return new ParquetBufferValue(listVal, estimatedParquetSize); @@ -520,13 +516,12 @@ private static ParquetBufferValue get3LevelMapValue( Map statsMap, SubColumnFinder subColumnFinder, ZoneId defaultTimezone, - final long insertRowsCurrIndex, - String path) { + final long insertRowsCurrIndex) { Map mapVal = - DataValidationUtil.validateAndParseIcebergMap(path, value, insertRowsCurrIndex); + DataValidationUtil.validateAndParseIcebergMap( + subColumnFinder.getDotPath(type.getId().toString()), value, insertRowsCurrIndex); List listVal = new ArrayList<>(); float estimatedParquetSize = 0; - String mapGroupPath = concatDotPath(path, THREE_LEVEL_MAP_GROUP_NAME); for (Map.Entry entry : mapVal.entrySet()) { ParquetBufferValue parsedKey = parseColumnValueToParquet( @@ -536,7 +531,6 @@ private static ParquetBufferValue get3LevelMapValue( subColumnFinder, defaultTimezone, insertRowsCurrIndex, - mapGroupPath, true); ParquetBufferValue parsedValue = parseColumnValueToParquet( @@ -546,14 +540,13 @@ private static ParquetBufferValue get3LevelMapValue( subColumnFinder, defaultTimezone, insertRowsCurrIndex, - mapGroupPath, true); listVal.add(Arrays.asList(parsedKey.getValue(), parsedValue.getValue())); estimatedParquetSize += parsedKey.getSize() + parsedValue.getSize(); } if (listVal.isEmpty()) { subColumnFinder - .getSubColumns(path) + .getSubColumns(type.getId().toString()) .forEach(subColumn -> statsMap.get(subColumn).incCurrentNullCount()); } return new ParquetBufferValue(listVal, estimatedParquetSize); diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java index 000d34493..5338655fb 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java @@ -224,7 +224,7 @@ public void setupSchema(List columns) { int ordinal = schema.getType(columnDescriptor.getPath()[0]).getId().intValue(); this.statsMap.put( - columnDotPath, + primitiveType.getId().toString(), new RowBufferStats( columnDotPath, null /* collationDefinitionString */, @@ -237,7 +237,7 @@ public void setupSchema(List columns) { if (onErrorOption == OpenChannelRequest.OnErrorOption.ABORT || onErrorOption == OpenChannelRequest.OnErrorOption.SKIP_BATCH) { this.tempStatsMap.put( - columnDotPath, + primitiveType.getId().toString(), new RowBufferStats( columnDotPath, null /* collationDefinitionString */, @@ -364,12 +364,13 @@ private float addRow( throw new SFException(ErrorCode.INTERNAL_ERROR, "SubColumnFinder is not initialized."); } - for (String subColumn : subColumnFinder.getSubColumns(columnName)) { - RowBufferStats stats = statsMap.get(subColumn); + for (String subColumnId : + subColumnFinder.getSubColumns(fieldIndex.get(columnName).type.getId().toString())) { + RowBufferStats stats = statsMap.get(subColumnId); if (stats == null) { throw new SFException( ErrorCode.INTERNAL_ERROR, - String.format("Column %s not found in stats map.", subColumn)); + String.format("Field id %s not found in stats map.", subColumnId)); } stats.incCurrentNullCount(); } diff --git a/src/main/java/net/snowflake/ingest/utils/IcebergDataTypeParser.java b/src/main/java/net/snowflake/ingest/utils/IcebergDataTypeParser.java index 53b4892c2..b2eb03cda 100644 --- a/src/main/java/net/snowflake/ingest/utils/IcebergDataTypeParser.java +++ b/src/main/java/net/snowflake/ingest/utils/IcebergDataTypeParser.java @@ -44,6 +44,8 @@ public class IcebergDataTypeParser { private static final String ELEMENT_REQUIRED = "element-required"; private static final String VALUE_REQUIRED = "value-required"; + private static final String EMPTY_FIELD_CHAR = "\\"; + /** Object mapper for this class */ private static final ObjectMapper MAPPER = new ObjectMapper(); @@ -65,16 +67,21 @@ public static org.apache.parquet.schema.Type parseIcebergDataTypeStringToParquet int id, String name) { Type icebergType = deserializeIcebergType(icebergDataType); + org.apache.parquet.schema.Type parquetType; if (icebergType.isPrimitiveType()) { - return typeToMessageType.primitive(icebergType.asPrimitiveType(), repetition, id, name); + parquetType = + typeToMessageType.primitive(icebergType.asPrimitiveType(), repetition, id, name); } else { switch (icebergType.typeId()) { case LIST: - return typeToMessageType.list(icebergType.asListType(), repetition, id, name); + parquetType = typeToMessageType.list(icebergType.asListType(), repetition, id, name); + break; case MAP: - return typeToMessageType.map(icebergType.asMapType(), repetition, id, name); + parquetType = typeToMessageType.map(icebergType.asMapType(), repetition, id, name); + break; case STRUCT: - return typeToMessageType.struct(icebergType.asStructType(), repetition, id, name); + parquetType = typeToMessageType.struct(icebergType.asStructType(), repetition, id, name); + break; default: throw new SFException( ErrorCode.INTERNAL_ERROR, @@ -83,6 +90,7 @@ public static org.apache.parquet.schema.Type parseIcebergDataTypeStringToParquet name, icebergDataType)); } } + return replaceWithOriginalFieldName(parquetType, icebergType, name); } /** @@ -154,7 +162,14 @@ public static Type getTypeFromJson(@Nonnull JsonNode jsonNode) { field.isObject(), "Cannot parse struct field from non-object: %s", field); int id = JsonUtil.getInt(ID, field); - String name = JsonUtil.getString(NAME, field); + + /* TypeToMessageType throws on empty field name, use a backslash to represent it and escape remaining backslash. */ + String name = + JsonUtil.getString(NAME, field) + .replace(EMPTY_FIELD_CHAR, EMPTY_FIELD_CHAR + EMPTY_FIELD_CHAR); + if (name.isEmpty()) { + name = EMPTY_FIELD_CHAR; + } Type type = getTypeFromJson(field.get(TYPE)); String doc = JsonUtil.getStringOrNull(DOC, field); @@ -208,4 +223,61 @@ public static Types.MapType mapFromJson(JsonNode json) { return Types.MapType.ofOptional(keyId, valueId, keyType, valueType); } } + + private static org.apache.parquet.schema.Type replaceWithOriginalFieldName( + org.apache.parquet.schema.Type parquetType, Type icebergType, String fieldName) { + if (parquetType.isPrimitive() != icebergType.isPrimitiveType() + || (!parquetType.isPrimitive() + && parquetType.getLogicalTypeAnnotation() + == null /* ignore outer layer of map or list */ + && parquetType.asGroupType().getFieldCount() + != icebergType.asNestedType().fields().size())) { + throw new IllegalArgumentException( + String.format( + "Parquet type and Iceberg type mismatch: %s, %s", parquetType, icebergType)); + } + if (parquetType.isPrimitive()) { + /* rename field name */ + return org.apache.parquet.schema.Types.primitive( + parquetType.asPrimitiveType().getPrimitiveTypeName(), parquetType.getRepetition()) + .as(parquetType.asPrimitiveType().getLogicalTypeAnnotation()) + .id(parquetType.getId().intValue()) + .length(parquetType.asPrimitiveType().getTypeLength()) + .named(fieldName); + } else { + org.apache.parquet.schema.Types.GroupBuilder builder = + org.apache.parquet.schema.Types.buildGroup(parquetType.getRepetition()); + for (org.apache.parquet.schema.Type parquetFieldType : + parquetType.asGroupType().getFields()) { + if (parquetFieldType.getId() == null) { + /* middle layer of map or list. Skip this level as parquet's using 3-level list/map while iceberg's using 2-level list/map */ + builder.addField( + replaceWithOriginalFieldName( + parquetFieldType, icebergType, parquetFieldType.getName())); + } else { + Types.NestedField icebergField = + icebergType.asNestedType().field(parquetFieldType.getId().intValue()); + if (icebergField == null) { + throw new IllegalArgumentException( + String.format( + "Cannot find Iceberg field with id %d in Iceberg type: %s", + parquetFieldType.getId().intValue(), icebergType)); + } + builder.addField( + replaceWithOriginalFieldName( + parquetFieldType, + icebergField.type(), + icebergField.name().equals(EMPTY_FIELD_CHAR) + ? "" + : icebergField + .name() + .replace(EMPTY_FIELD_CHAR + EMPTY_FIELD_CHAR, EMPTY_FIELD_CHAR))); + } + } + if (parquetType.getId() != null) { + builder.id(parquetType.getId().intValue()); + } + return builder.as(parquetType.getLogicalTypeAnnotation()).named(fieldName); + } + } } diff --git a/src/main/java/net/snowflake/ingest/utils/SubColumnFinder.java b/src/main/java/net/snowflake/ingest/utils/SubColumnFinder.java index 2b71eea27..340c51d3d 100644 --- a/src/main/java/net/snowflake/ingest/utils/SubColumnFinder.java +++ b/src/main/java/net/snowflake/ingest/utils/SubColumnFinder.java @@ -5,7 +5,6 @@ package net.snowflake.ingest.utils; import static net.snowflake.ingest.utils.Utils.concatDotPath; -import static net.snowflake.ingest.utils.Utils.isNullOrEmpty; import java.util.ArrayList; import java.util.Collections; @@ -15,20 +14,22 @@ import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Type; -/** Helper class to find all leaf sub-columns in an immutable schema given a dot path. */ +/** Helper class to find all leaf sub-columns in an immutable schema given a fieldId. */ public class SubColumnFinder { - static class SubtreeInterval { + static class SubtreeInfo { final int startTag; final int endTag; + final String dotPath; - SubtreeInterval(int startTag, int endTag) { + SubtreeInfo(int startTag, int endTag, String dotPath) { this.startTag = startTag; this.endTag = endTag; + this.dotPath = dotPath; } } private final List list; - private final Map accessMap; + private final Map accessMap; public SubColumnFinder(MessageType schema) { accessMap = new HashMap<>(); @@ -36,34 +37,44 @@ public SubColumnFinder(MessageType schema) { build(schema, null); } - public List getSubColumns(String dotPath) { - if (!accessMap.containsKey(dotPath)) { - throw new IllegalArgumentException(String.format("Column %s not found in schema", dotPath)); + public List getSubColumns(String id) { + if (!accessMap.containsKey(id)) { + throw new IllegalArgumentException(String.format("Field %s not found in schema", id)); } - SubtreeInterval interval = accessMap.get(dotPath); + SubtreeInfo interval = accessMap.get(id); return Collections.unmodifiableList(list.subList(interval.startTag, interval.endTag)); } - private void build(Type node, String dotPath) { - if (dotPath == null) { + public String getDotPath(String id) { + if (!accessMap.containsKey(id)) { + throw new IllegalArgumentException(String.format("Field %s not found in schema", id)); + } + return accessMap.get(id).dotPath; + } + + private void build(Type node, List path) { + if (path == null) { /* Ignore root node type name (bdec or schema) */ - dotPath = ""; - } else if (dotPath.isEmpty()) { - dotPath = node.getName(); + path = new ArrayList<>(); } else { - dotPath = concatDotPath(dotPath, node.getName()); + path.add(node.getName()); } int startTag = list.size(); if (!node.isPrimitive()) { for (Type child : node.asGroupType().getFields()) { - build(child, dotPath); + build(child, path); } } else { - list.add(dotPath); + list.add(node.getId().toString()); + } + if (!path.isEmpty() && node.getId() != null) { + accessMap.put( + node.getId().toString(), + new SubtreeInfo(startTag, list.size(), concatDotPath(path.toArray(new String[0])))); } - if (!isNullOrEmpty(dotPath)) { - accessMap.put(dotPath, new SubtreeInterval(startTag, list.size())); + if (!path.isEmpty()) { + path.remove(path.size() - 1); } } } diff --git a/src/main/java/net/snowflake/ingest/utils/Utils.java b/src/main/java/net/snowflake/ingest/utils/Utils.java index 002ce86a2..1759a9d8f 100644 --- a/src/main/java/net/snowflake/ingest/utils/Utils.java +++ b/src/main/java/net/snowflake/ingest/utils/Utils.java @@ -417,20 +417,21 @@ public static String getFullyQualifiedChannelName( } /** - * Get concat dot path, check if any path is empty or null + * Get concat dot path, check if any path is empty or null. Escape the dot field name to avoid + * column name collision. * * @param path the path */ public static String concatDotPath(String... path) { StringBuilder sb = new StringBuilder(); for (String p : path) { - if (isNullOrEmpty(p)) { - throw new IllegalArgumentException("Path cannot be null or empty"); + if (p == null) { + throw new IllegalArgumentException("Path cannot be null"); } if (sb.length() > 0) { sb.append("."); } - sb.append(p); + sb.append(p.replace("\\", "\\\\").replace(".", "\\.")); } return sb.toString(); } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParserTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParserTest.java index 279e100fd..51a37dc00 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParserTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/IcebergParquetValueParserTest.java @@ -48,13 +48,13 @@ public void setUp() { @Test public void parseValueBoolean() { Type type = - Types.primitive(PrimitiveTypeName.BOOLEAN, Repetition.OPTIONAL).named("BOOLEAN_COL"); + Types.primitive(PrimitiveTypeName.BOOLEAN, Repetition.OPTIONAL).id(1).named("BOOLEAN_COL"); RowBufferStats rowBufferStats = new RowBufferStats("BOOLEAN_COL", true, true); Map rowBufferStatsMap = new HashMap() { { - put("BOOLEAN_COL", rowBufferStats); + put("1", rowBufferStats); } }; @@ -73,13 +73,14 @@ public void parseValueBoolean() { @Test public void parseValueInt() { - Type type = Types.primitive(PrimitiveTypeName.INT32, Repetition.OPTIONAL).named("INT_COL"); + Type type = + Types.primitive(PrimitiveTypeName.INT32, Repetition.OPTIONAL).id(1).named("INT_COL"); RowBufferStats rowBufferStats = new RowBufferStats("INT_COL", true, true); Map rowBufferStatsMap = new HashMap() { { - put("INT_COL", rowBufferStats); + put("1", rowBufferStats); } }; ParquetBufferValue pv = @@ -99,6 +100,7 @@ public void parseValueInt() { public void parseValueDecimalToInt() { Type type = Types.primitive(PrimitiveTypeName.INT32, Repetition.OPTIONAL) + .id(1) .as(LogicalTypeAnnotation.decimalType(4, 9)) .named("DECIMAL_COL"); @@ -106,7 +108,7 @@ public void parseValueDecimalToInt() { Map rowBufferStatsMap = new HashMap() { { - put("DECIMAL_COL", rowBufferStats); + put("1", rowBufferStats); } }; ParquetBufferValue pv = @@ -126,6 +128,7 @@ public void parseValueDecimalToInt() { public void parseValueDateToInt() { Type type = Types.primitive(PrimitiveTypeName.INT32, Repetition.OPTIONAL) + .id(1) .as(LogicalTypeAnnotation.dateType()) .named("DATE_COL"); @@ -133,7 +136,7 @@ public void parseValueDateToInt() { Map rowBufferStatsMap = new HashMap() { { - put("DATE_COL", rowBufferStats); + put("1", rowBufferStats); } }; ParquetBufferValue pv = @@ -151,13 +154,14 @@ public void parseValueDateToInt() { @Test public void parseValueLong() { - Type type = Types.primitive(PrimitiveTypeName.INT64, Repetition.OPTIONAL).named("LONG_COL"); + Type type = + Types.primitive(PrimitiveTypeName.INT64, Repetition.OPTIONAL).id(1).named("LONG_COL"); RowBufferStats rowBufferStats = new RowBufferStats("LONG_COL", true, true); Map rowBufferStatsMap = new HashMap() { { - put("LONG_COL", rowBufferStats); + put("1", rowBufferStats); } }; ParquetBufferValue pv = @@ -177,6 +181,7 @@ public void parseValueLong() { public void parseValueDecimalToLong() { Type type = Types.primitive(PrimitiveTypeName.INT64, Repetition.OPTIONAL) + .id(1) .as(LogicalTypeAnnotation.decimalType(9, 18)) .named("DECIMAL_COL"); @@ -184,7 +189,7 @@ public void parseValueDecimalToLong() { Map rowBufferStatsMap = new HashMap() { { - put("DECIMAL_COL", rowBufferStats); + put("1", rowBufferStats); } }; ParquetBufferValue pv = @@ -209,6 +214,7 @@ public void parseValueDecimalToLong() { public void parseValueTimeToLong() { Type type = Types.primitive(PrimitiveTypeName.INT64, Repetition.OPTIONAL) + .id(1) .as(LogicalTypeAnnotation.timeType(false, LogicalTypeAnnotation.TimeUnit.MICROS)) .named("TIME_COL"); @@ -216,7 +222,7 @@ public void parseValueTimeToLong() { Map rowBufferStatsMap = new HashMap() { { - put("TIME_COL", rowBufferStats); + put("1", rowBufferStats); } }; ParquetBufferValue pv = @@ -236,6 +242,7 @@ public void parseValueTimeToLong() { public void parseValueTimestampToLong() { Type type = Types.primitive(PrimitiveTypeName.INT64, Repetition.OPTIONAL) + .id(1) .as(LogicalTypeAnnotation.timestampType(false, LogicalTypeAnnotation.TimeUnit.MICROS)) .named("TIMESTAMP_COL"); @@ -243,7 +250,7 @@ public void parseValueTimestampToLong() { Map rowBufferStatsMap = new HashMap() { { - put("TIMESTAMP_COL", rowBufferStats); + put("1", rowBufferStats); } }; ParquetBufferValue pv = @@ -263,6 +270,7 @@ public void parseValueTimestampToLong() { public void parseValueTimestampTZToLong() { Type type = Types.primitive(PrimitiveTypeName.INT64, Repetition.OPTIONAL) + .id(1) .as(LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MICROS)) .named("TIMESTAMP_TZ_COL"); @@ -270,7 +278,7 @@ public void parseValueTimestampTZToLong() { Map rowBufferStatsMap = new HashMap() { { - put("TIMESTAMP_TZ_COL", rowBufferStats); + put("1", rowBufferStats); } }; ParquetBufferValue pv = @@ -288,13 +296,14 @@ public void parseValueTimestampTZToLong() { @Test public void parseValueFloat() { - Type type = Types.primitive(PrimitiveTypeName.FLOAT, Repetition.OPTIONAL).named("FLOAT_COL"); + Type type = + Types.primitive(PrimitiveTypeName.FLOAT, Repetition.OPTIONAL).id(1).named("FLOAT_COL"); RowBufferStats rowBufferStats = new RowBufferStats("FLOAT_COL", true, true); Map rowBufferStatsMap = new HashMap() { { - put("FLOAT_COL", rowBufferStats); + put("1", rowBufferStats); } }; ParquetBufferValue pv = @@ -312,13 +321,14 @@ public void parseValueFloat() { @Test public void parseValueDouble() { - Type type = Types.primitive(PrimitiveTypeName.DOUBLE, Repetition.OPTIONAL).named("DOUBLE_COL"); + Type type = + Types.primitive(PrimitiveTypeName.DOUBLE, Repetition.OPTIONAL).id(1).named("DOUBLE_COL"); RowBufferStats rowBufferStats = new RowBufferStats("DOUBLE_COL", true, true); Map rowBufferStatsMap = new HashMap() { { - put("DOUBLE_COL", rowBufferStats); + put("1", rowBufferStats); } }; ParquetBufferValue pv = @@ -336,13 +346,14 @@ public void parseValueDouble() { @Test public void parseValueBinary() { - Type type = Types.primitive(PrimitiveTypeName.BINARY, Repetition.OPTIONAL).named("BINARY_COL"); + Type type = + Types.primitive(PrimitiveTypeName.BINARY, Repetition.OPTIONAL).id(1).named("BINARY_COL"); RowBufferStats rowBufferStats = new RowBufferStats("BINARY_COL", true, true); Map rowBufferStatsMap = new HashMap() { { - put("BINARY_COL", rowBufferStats); + put("1", rowBufferStats); } }; byte[] value = "snowflake_to_the_moon".getBytes(); @@ -364,6 +375,7 @@ public void parseValueBinary() { public void parseValueStringToBinary() { Type type = Types.primitive(PrimitiveTypeName.BINARY, Repetition.OPTIONAL) + .id(1) .as(LogicalTypeAnnotation.stringType()) .named("BINARY_COL"); @@ -371,7 +383,7 @@ public void parseValueStringToBinary() { Map rowBufferStatsMap = new HashMap() { { - put("BINARY_COL", rowBufferStats); + put("1", rowBufferStats); } }; String value = "snowflake_to_the_moon"; @@ -395,6 +407,7 @@ public void parseValueStringToBinary() { public void parseValueFixed() { Type type = Types.primitive(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, Repetition.OPTIONAL) + .id(1) .length(4) .named("FIXED_COL"); @@ -402,7 +415,7 @@ public void parseValueFixed() { Map rowBufferStatsMap = new HashMap() { { - put("FIXED_COL", rowBufferStats); + put("1", rowBufferStats); } }; byte[] value = "snow".getBytes(); @@ -424,6 +437,7 @@ public void parseValueFixed() { public void parseValueDecimalToFixed() { Type type = Types.primitive(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, Repetition.OPTIONAL) + .id(1) .length(9) .as(LogicalTypeAnnotation.decimalType(10, 20)) .named("FIXED_COL"); @@ -432,7 +446,7 @@ public void parseValueDecimalToFixed() { Map rowBufferStatsMap = new HashMap() { { - put("FIXED_COL", rowBufferStats); + put("1", rowBufferStats); } }; BigDecimal value = new BigDecimal("1234567890.0123456789"); @@ -454,13 +468,14 @@ public void parseValueDecimalToFixed() { public void parseList() throws JsonProcessingException { Type list = Types.optionalList() - .element(Types.optional(PrimitiveTypeName.INT32).named("element")) + .element(Types.optional(PrimitiveTypeName.INT32).id(2).named("element")) + .id(1) .named("LIST_COL"); RowBufferStats rowBufferStats = new RowBufferStats("LIST_COL.list.element", true, true); Map rowBufferStatsMap = new HashMap() { { - put("LIST_COL.list.element", rowBufferStats); + put("2", rowBufferStats); } }; @@ -485,7 +500,8 @@ public void parseList() throws JsonProcessingException { /* Test required list */ Type requiredList = Types.requiredList() - .element(Types.optional(PrimitiveTypeName.INT32).named("element")) + .element(Types.optional(PrimitiveTypeName.INT32).id(2).named("element")) + .id(1) .named("LIST_COL"); Assert.assertThrows( SFException.class, @@ -508,7 +524,8 @@ public void parseList() throws JsonProcessingException { /* Test required list with required elements */ Type requiredElements = Types.requiredList() - .element(Types.required(PrimitiveTypeName.INT32).named("element")) + .element(Types.required(PrimitiveTypeName.INT32).id(1).named("element")) + .id(2) .named("LIST_COL"); Assert.assertThrows( SFException.class, @@ -526,16 +543,17 @@ public void parseList() throws JsonProcessingException { public void parseMap() throws JsonProcessingException { Type map = Types.optionalMap() - .key(Types.required(PrimitiveTypeName.INT32).named("key")) - .value(Types.optional(PrimitiveTypeName.INT32).named("value")) + .key(Types.required(PrimitiveTypeName.INT32).id(2).named("key")) + .value(Types.optional(PrimitiveTypeName.INT32).id(3).named("value")) + .id(1) .named("MAP_COL"); RowBufferStats rowBufferKeyStats = new RowBufferStats("MAP_COL.key_value.key", true, true); RowBufferStats rowBufferValueStats = new RowBufferStats("MAP_COL.key_value.value", true, true); Map rowBufferStatsMap = new HashMap() { { - put("MAP_COL.key_value.key", rowBufferKeyStats); - put("MAP_COL.key_value.value", rowBufferValueStats); + put("2", rowBufferKeyStats); + put("3", rowBufferValueStats); } }; IcebergParquetValueParser.parseColumnValueToParquet( @@ -570,8 +588,9 @@ public void parseMap() throws JsonProcessingException { /* Test required map */ Type requiredMap = Types.requiredMap() - .key(Types.required(PrimitiveTypeName.INT32).named("key")) - .value(Types.optional(PrimitiveTypeName.INT32).named("value")) + .key(Types.required(PrimitiveTypeName.INT32).id(2).named("key")) + .value(Types.optional(PrimitiveTypeName.INT32).id(3).named("value")) + .id(1) .named("MAP_COL"); Assert.assertThrows( SFException.class, @@ -599,8 +618,9 @@ public void parseMap() throws JsonProcessingException { /* Test required map with required values */ Type requiredValues = Types.requiredMap() - .key(Types.required(PrimitiveTypeName.INT32).named("key")) - .value(Types.required(PrimitiveTypeName.INT32).named("value")) + .key(Types.required(PrimitiveTypeName.INT32).id(2).named("key")) + .value(Types.required(PrimitiveTypeName.INT32).id(3).named("value")) + .id(1) .named("MAP_COL"); Assert.assertThrows( SFException.class, @@ -622,11 +642,13 @@ public void parseMap() throws JsonProcessingException { public void parseStruct() throws JsonProcessingException { Type struct = Types.optionalGroup() - .addField(Types.optional(PrimitiveTypeName.INT32).named("a")) + .addField(Types.optional(PrimitiveTypeName.INT32).id(2).named("a")) .addField( Types.required(PrimitiveTypeName.BINARY) + .id(3) .as(LogicalTypeAnnotation.stringType()) .named("b")) + .id(1) .named("STRUCT_COL"); RowBufferStats rowBufferAStats = new RowBufferStats("STRUCT_COL.a", true, true); @@ -634,8 +656,8 @@ public void parseStruct() throws JsonProcessingException { Map rowBufferStatsMap = new HashMap() { { - put("STRUCT_COL.a", rowBufferAStats); - put("STRUCT_COL.b", rowBufferBStats); + put("2", rowBufferAStats); + put("3", rowBufferBStats); } }; @@ -695,11 +717,13 @@ public void parseStruct() throws JsonProcessingException { /* Test required struct */ Type requiredStruct = Types.requiredGroup() - .addField(Types.optional(PrimitiveTypeName.INT32).named("a")) + .addField(Types.optional(PrimitiveTypeName.INT32).id(2).named("a")) .addField( Types.optional(PrimitiveTypeName.BINARY) + .id(3) .as(LogicalTypeAnnotation.stringType()) .named("b")) + .id(1) .named("STRUCT_COL"); Assert.assertThrows( SFException.class, @@ -749,8 +773,8 @@ public void parseNestedTypes() { private static Type generateNestedTypeAndStats( int depth, String name, Map rowBufferStatsMap, String path) { if (depth == 0) { - rowBufferStatsMap.put(path, new RowBufferStats(path, true, true)); - return Types.optional(PrimitiveTypeName.INT32).named(name); + rowBufferStatsMap.put("0", new RowBufferStats(path, true, true)); + return Types.optional(PrimitiveTypeName.INT32).id(0).named(name); } switch (depth % 3) { case 1: @@ -758,19 +782,22 @@ private static Type generateNestedTypeAndStats( .element( generateNestedTypeAndStats( depth - 1, "element", rowBufferStatsMap, path + ".list.element")) + .id(1) .named(name); case 2: return Types.optionalGroup() .addField(generateNestedTypeAndStats(depth - 1, "a", rowBufferStatsMap, path + ".a")) + .id(1) .named(name); case 0: rowBufferStatsMap.put( - path + ".key_value.key", new RowBufferStats(path + ".key_value.key", true, true)); + String.valueOf(depth), new RowBufferStats(path + ".key_value.key", true, true)); return Types.optionalMap() - .key(Types.required(PrimitiveTypeName.INT32).named("key")) + .key(Types.required(PrimitiveTypeName.INT32).id(depth).named("key")) .value( generateNestedTypeAndStats( depth - 1, "value", rowBufferStatsMap, path + ".key_value.value")) + .id(1) .named(name); } return null; diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java index 747773e9e..8822363a8 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/RowBufferTest.java @@ -321,12 +321,14 @@ public void buildFieldErrorStates() { @Test public void testReset() { - RowBufferStats stats = this.rowBufferOnErrorContinue.statsMap.get("COLCHAR"); + RowBufferStats stats = + this.rowBufferOnErrorContinue.statsMap.get(enableIcebergStreaming ? "1" : "COLCHAR"); stats.addIntValue(BigInteger.valueOf(1)); Assert.assertEquals(BigInteger.valueOf(1), stats.getCurrentMaxIntValue()); Assert.assertNull(stats.getCollationDefinitionString()); this.rowBufferOnErrorContinue.reset(); - RowBufferStats resetStats = this.rowBufferOnErrorContinue.statsMap.get("COLCHAR"); + RowBufferStats resetStats = + this.rowBufferOnErrorContinue.statsMap.get(enableIcebergStreaming ? "1" : "COLCHAR"); Assert.assertNotNull(resetStats); Assert.assertNull(resetStats.getCurrentMaxIntValue()); Assert.assertNull(resetStats.getCollationDefinitionString()); @@ -872,51 +874,76 @@ private void testStatsE2EHelper(AbstractRowBuffer rowBuffer) { Map columnEpStats = result.getColumnEps(); Assert.assertEquals( - BigInteger.valueOf(11), columnEpStats.get("colTinyInt").getCurrentMaxIntValue()); + BigInteger.valueOf(11), + columnEpStats.get(enableIcebergStreaming ? "1" : "colTinyInt").getCurrentMaxIntValue()); Assert.assertEquals( - BigInteger.valueOf(10), columnEpStats.get("colTinyInt").getCurrentMinIntValue()); - Assert.assertEquals(0, columnEpStats.get("colTinyInt").getCurrentNullCount()); + BigInteger.valueOf(10), + columnEpStats.get(enableIcebergStreaming ? "1" : "colTinyInt").getCurrentMinIntValue()); Assert.assertEquals( - enableIcebergStreaming ? 2 : -1, columnEpStats.get("colTinyInt").getDistinctValues()); + 0, columnEpStats.get(enableIcebergStreaming ? "1" : "colTinyInt").getCurrentNullCount()); + Assert.assertEquals( + enableIcebergStreaming ? 2 : -1, + columnEpStats.get(enableIcebergStreaming ? "1" : "colTinyInt").getDistinctValues()); Assert.assertEquals( - BigInteger.valueOf(1), columnEpStats.get("COLTINYINT").getCurrentMaxIntValue()); + BigInteger.valueOf(1), + columnEpStats.get(enableIcebergStreaming ? "2" : "COLTINYINT").getCurrentMaxIntValue()); + Assert.assertEquals( + BigInteger.valueOf(1), + columnEpStats.get(enableIcebergStreaming ? "2" : "COLTINYINT").getCurrentMinIntValue()); Assert.assertEquals( - BigInteger.valueOf(1), columnEpStats.get("COLTINYINT").getCurrentMinIntValue()); - Assert.assertEquals(0, columnEpStats.get("COLTINYINT").getCurrentNullCount()); + 0, columnEpStats.get(enableIcebergStreaming ? "2" : "COLTINYINT").getCurrentNullCount()); Assert.assertEquals( - enableIcebergStreaming ? 1 : -1, columnEpStats.get("COLTINYINT").getDistinctValues()); + enableIcebergStreaming ? 1 : -1, + columnEpStats.get(enableIcebergStreaming ? "2" : "COLTINYINT").getDistinctValues()); Assert.assertEquals( - BigInteger.valueOf(3), columnEpStats.get("COLSMALLINT").getCurrentMaxIntValue()); + BigInteger.valueOf(3), + columnEpStats.get(enableIcebergStreaming ? "3" : "COLSMALLINT").getCurrentMaxIntValue()); + Assert.assertEquals( + BigInteger.valueOf(2), + columnEpStats.get(enableIcebergStreaming ? "3" : "COLSMALLINT").getCurrentMinIntValue()); Assert.assertEquals( - BigInteger.valueOf(2), columnEpStats.get("COLSMALLINT").getCurrentMinIntValue()); - Assert.assertEquals(0, columnEpStats.get("COLSMALLINT").getCurrentNullCount()); + 0, columnEpStats.get(enableIcebergStreaming ? "3" : "COLSMALLINT").getCurrentNullCount()); Assert.assertEquals( - enableIcebergStreaming ? 2 : -1, columnEpStats.get("COLSMALLINT").getDistinctValues()); + enableIcebergStreaming ? 2 : -1, + columnEpStats.get(enableIcebergStreaming ? "3" : "COLSMALLINT").getDistinctValues()); - Assert.assertEquals(BigInteger.valueOf(3), columnEpStats.get("COLINT").getCurrentMaxIntValue()); - Assert.assertEquals(BigInteger.valueOf(3), columnEpStats.get("COLINT").getCurrentMinIntValue()); - Assert.assertEquals(1L, columnEpStats.get("COLINT").getCurrentNullCount()); Assert.assertEquals( - enableIcebergStreaming ? 1 : -1, columnEpStats.get("COLINT").getDistinctValues()); + BigInteger.valueOf(3), + columnEpStats.get(enableIcebergStreaming ? "4" : "COLINT").getCurrentMaxIntValue()); + Assert.assertEquals( + BigInteger.valueOf(3), + columnEpStats.get(enableIcebergStreaming ? "4" : "COLINT").getCurrentMinIntValue()); + Assert.assertEquals( + 1L, columnEpStats.get(enableIcebergStreaming ? "4" : "COLINT").getCurrentNullCount()); + Assert.assertEquals( + enableIcebergStreaming ? 1 : -1, + columnEpStats.get(enableIcebergStreaming ? "4" : "COLINT").getDistinctValues()); Assert.assertEquals( - BigInteger.valueOf(40), columnEpStats.get("COLBIGINT").getCurrentMaxIntValue()); + BigInteger.valueOf(40), + columnEpStats.get(enableIcebergStreaming ? "5" : "COLBIGINT").getCurrentMaxIntValue()); Assert.assertEquals( - BigInteger.valueOf(4), columnEpStats.get("COLBIGINT").getCurrentMinIntValue()); - Assert.assertEquals(0, columnEpStats.get("COLBIGINT").getCurrentNullCount()); + BigInteger.valueOf(4), + columnEpStats.get(enableIcebergStreaming ? "5" : "COLBIGINT").getCurrentMinIntValue()); Assert.assertEquals( - enableIcebergStreaming ? 2 : -1, columnEpStats.get("COLBIGINT").getDistinctValues()); + 0, columnEpStats.get(enableIcebergStreaming ? "5" : "COLBIGINT").getCurrentNullCount()); + Assert.assertEquals( + enableIcebergStreaming ? 2 : -1, + columnEpStats.get(enableIcebergStreaming ? "5" : "COLBIGINT").getDistinctValues()); Assert.assertArrayEquals( - "2".getBytes(StandardCharsets.UTF_8), columnEpStats.get("COLCHAR").getCurrentMinStrValue()); + "2".getBytes(StandardCharsets.UTF_8), + columnEpStats.get(enableIcebergStreaming ? "7" : "COLCHAR").getCurrentMinStrValue()); Assert.assertArrayEquals( "alice".getBytes(StandardCharsets.UTF_8), - columnEpStats.get("COLCHAR").getCurrentMaxStrValue()); - Assert.assertEquals(0, columnEpStats.get("COLCHAR").getCurrentNullCount()); + columnEpStats.get(enableIcebergStreaming ? "7" : "COLCHAR").getCurrentMaxStrValue()); + Assert.assertEquals( + 0, columnEpStats.get(enableIcebergStreaming ? "7" : "COLCHAR").getCurrentNullCount()); Assert.assertEquals( - enableIcebergStreaming ? 2 : -1, columnEpStats.get("COLCHAR").getDistinctValues()); + enableIcebergStreaming ? 2 : -1, + columnEpStats.get(enableIcebergStreaming ? "7" : "COLCHAR").getDistinctValues()); // Confirm we reset ChannelData resetResults = rowBuffer.flush(); @@ -950,7 +977,7 @@ private void testStatsE2ETimestampHelper(OpenChannelRequest.OnErrorOption onErro colTimestampLtzSB16.setScale(9); ColumnMetadata colTimestampLtzSB16Scale6 = new ColumnMetadata(); - colTimestampLtzSB16Scale6.setOrdinal(2); + colTimestampLtzSB16Scale6.setOrdinal(3); colTimestampLtzSB16Scale6.setName("COLTIMESTAMPLTZ_SB16_SCALE6"); colTimestampLtzSB16Scale6.setPhysicalType("SB16"); colTimestampLtzSB16Scale6.setNullable(true); @@ -989,10 +1016,16 @@ private void testStatsE2ETimestampHelper(OpenChannelRequest.OnErrorOption onErro Assert.assertEquals( BigInteger.valueOf(1621899220 * (enableIcebergStreaming ? 1000000L : 1)), - result.getColumnEps().get("COLTIMESTAMPLTZ_SB8").getCurrentMinIntValue()); + result + .getColumnEps() + .get(enableIcebergStreaming ? "1" : "COLTIMESTAMPLTZ_SB8") + .getCurrentMinIntValue()); Assert.assertEquals( BigInteger.valueOf(1621899221 * (enableIcebergStreaming ? 1000000L : 1)), - result.getColumnEps().get("COLTIMESTAMPLTZ_SB8").getCurrentMaxIntValue()); + result + .getColumnEps() + .get(enableIcebergStreaming ? "1" : "COLTIMESTAMPLTZ_SB8") + .getCurrentMaxIntValue()); /* Iceberg only supports microsecond precision for TIMESTAMP_LTZ */ if (!enableIcebergStreaming) { @@ -1008,14 +1041,29 @@ private void testStatsE2ETimestampHelper(OpenChannelRequest.OnErrorOption onErro Assert.assertEquals( new BigInteger("1621899220123456"), - result.getColumnEps().get("COLTIMESTAMPLTZ_SB16_SCALE6").getCurrentMinIntValue()); + result + .getColumnEps() + .get(enableIcebergStreaming ? "3" : "COLTIMESTAMPLTZ_SB16_SCALE6") + .getCurrentMinIntValue()); Assert.assertEquals( new BigInteger("1621899220123457"), - result.getColumnEps().get("COLTIMESTAMPLTZ_SB16_SCALE6").getCurrentMaxIntValue()); + result + .getColumnEps() + .get(enableIcebergStreaming ? "3" : "COLTIMESTAMPLTZ_SB16_SCALE6") + .getCurrentMaxIntValue()); - Assert.assertEquals(1, result.getColumnEps().get("COLTIMESTAMPLTZ_SB8").getCurrentNullCount()); Assert.assertEquals( - 1, result.getColumnEps().get("COLTIMESTAMPLTZ_SB16_SCALE6").getCurrentNullCount()); + 1, + result + .getColumnEps() + .get(enableIcebergStreaming ? "1" : "COLTIMESTAMPLTZ_SB8") + .getCurrentNullCount()); + Assert.assertEquals( + 1, + result + .getColumnEps() + .get(enableIcebergStreaming ? "1" : "COLTIMESTAMPLTZ_SB16_SCALE6") + .getCurrentNullCount()); } @Test @@ -1061,11 +1109,21 @@ private void testE2EDateHelper(OpenChannelRequest.OnErrorOption onErrorOption) { Assert.assertEquals(3, result.getRowCount()); Assert.assertEquals( - BigInteger.valueOf(18772), result.getColumnEps().get("COLDATE").getCurrentMinIntValue()); + BigInteger.valueOf(18772), + result + .getColumnEps() + .get(enableIcebergStreaming ? "1" : "COLDATE") + .getCurrentMinIntValue()); Assert.assertEquals( - BigInteger.valueOf(18773), result.getColumnEps().get("COLDATE").getCurrentMaxIntValue()); + BigInteger.valueOf(18773), + result + .getColumnEps() + .get(enableIcebergStreaming ? "1" : "COLDATE") + .getCurrentMaxIntValue()); - Assert.assertEquals(1, result.getColumnEps().get("COLDATE").getCurrentNullCount()); + Assert.assertEquals( + 1, + result.getColumnEps().get(enableIcebergStreaming ? "1" : "COLDATE").getCurrentNullCount()); } @Test @@ -1147,35 +1205,79 @@ private void testE2ETimeHelper(OpenChannelRequest.OnErrorOption onErrorOption) { if (enableIcebergStreaming) { Assert.assertEquals( BigInteger.valueOf(10 * 60 * 60 * 1000000L), - result.getColumnEps().get("COLTIMESB4").getCurrentMinIntValue()); + result + .getColumnEps() + .get(enableIcebergStreaming ? "1" : "COLTIMESB4") + .getCurrentMinIntValue()); Assert.assertEquals( BigInteger.valueOf((11 * 60 * 60 + 15 * 60) * 1000000L), - result.getColumnEps().get("COLTIMESB4").getCurrentMaxIntValue()); - Assert.assertEquals(1, result.getColumnEps().get("COLTIMESB4").getCurrentNullCount()); + result + .getColumnEps() + .get(enableIcebergStreaming ? "1" : "COLTIMESB4") + .getCurrentMaxIntValue()); + Assert.assertEquals( + 1, + result + .getColumnEps() + .get(enableIcebergStreaming ? "1" : "COLTIMESB4") + .getCurrentNullCount()); Assert.assertEquals( BigInteger.valueOf((10 * 60 * 60 * 1000L + 123) * 1000L), - result.getColumnEps().get("COLTIMESB8").getCurrentMinIntValue()); + result + .getColumnEps() + .get(enableIcebergStreaming ? "2" : "COLTIMESB8") + .getCurrentMinIntValue()); Assert.assertEquals( BigInteger.valueOf((11 * 60 * 60 * 1000L + 15 * 60 * 1000 + 456) * 1000L), - result.getColumnEps().get("COLTIMESB8").getCurrentMaxIntValue()); - Assert.assertEquals(1, result.getColumnEps().get("COLTIMESB8").getCurrentNullCount()); + result + .getColumnEps() + .get(enableIcebergStreaming ? "2" : "COLTIMESB8") + .getCurrentMaxIntValue()); + Assert.assertEquals( + 1, + result + .getColumnEps() + .get(enableIcebergStreaming ? "2" : "COLTIMESB8") + .getCurrentNullCount()); } else { Assert.assertEquals( BigInteger.valueOf(10 * 60 * 60), - result.getColumnEps().get("COLTIMESB4").getCurrentMinIntValue()); + result + .getColumnEps() + .get(enableIcebergStreaming ? "1" : "COLTIMESB4") + .getCurrentMinIntValue()); Assert.assertEquals( BigInteger.valueOf(11 * 60 * 60 + 15 * 60), - result.getColumnEps().get("COLTIMESB4").getCurrentMaxIntValue()); - Assert.assertEquals(1, result.getColumnEps().get("COLTIMESB4").getCurrentNullCount()); + result + .getColumnEps() + .get(enableIcebergStreaming ? "1" : "COLTIMESB4") + .getCurrentMaxIntValue()); + Assert.assertEquals( + 1, + result + .getColumnEps() + .get(enableIcebergStreaming ? "1" : "COLTIMESB4") + .getCurrentNullCount()); Assert.assertEquals( BigInteger.valueOf(10 * 60 * 60 * 1000L + 123), - result.getColumnEps().get("COLTIMESB8").getCurrentMinIntValue()); + result + .getColumnEps() + .get(enableIcebergStreaming ? "2" : "COLTIMESB8") + .getCurrentMinIntValue()); Assert.assertEquals( BigInteger.valueOf(11 * 60 * 60 * 1000L + 15 * 60 * 1000 + 456), - result.getColumnEps().get("COLTIMESB8").getCurrentMaxIntValue()); - Assert.assertEquals(1, result.getColumnEps().get("COLTIMESB8").getCurrentNullCount()); + result + .getColumnEps() + .get(enableIcebergStreaming ? "2" : "COLTIMESB8") + .getCurrentMaxIntValue()); + Assert.assertEquals( + 1, + result + .getColumnEps() + .get(enableIcebergStreaming ? "2" : "COLTIMESB8") + .getCurrentNullCount()); } } @@ -1394,9 +1496,12 @@ private void doTestFailureHalfwayThroughColumnProcessing( } ChannelData channelData = innerBuffer.flush(); - RowBufferStats statsCol1 = channelData.getColumnEps().get("COLVARCHAR1"); - RowBufferStats statsCol2 = channelData.getColumnEps().get("COLVARCHAR2"); - RowBufferStats statsCol3 = channelData.getColumnEps().get("COLBOOLEAN1"); + RowBufferStats statsCol1 = + channelData.getColumnEps().get(enableIcebergStreaming ? "1" : "COLVARCHAR1"); + RowBufferStats statsCol2 = + channelData.getColumnEps().get(enableIcebergStreaming ? "2" : "COLVARCHAR2"); + RowBufferStats statsCol3 = + channelData.getColumnEps().get(enableIcebergStreaming ? "3" : "COLBOOLEAN1"); Assert.assertEquals(1, channelData.getRowCount()); Assert.assertEquals(0, statsCol1.getCurrentNullCount()); Assert.assertEquals(0, statsCol2.getCurrentNullCount()); @@ -1460,10 +1565,23 @@ private void testE2EBooleanHelper(OpenChannelRequest.OnErrorOption onErrorOption Assert.assertEquals(3, result.getRowCount()); Assert.assertEquals( - BigInteger.valueOf(0), result.getColumnEps().get("COLBOOLEAN").getCurrentMinIntValue()); + BigInteger.valueOf(0), + result + .getColumnEps() + .get(enableIcebergStreaming ? "1" : "COLBOOLEAN") + .getCurrentMinIntValue()); Assert.assertEquals( - BigInteger.valueOf(1), result.getColumnEps().get("COLBOOLEAN").getCurrentMaxIntValue()); - Assert.assertEquals(1, result.getColumnEps().get("COLBOOLEAN").getCurrentNullCount()); + BigInteger.valueOf(1), + result + .getColumnEps() + .get(enableIcebergStreaming ? "1" : "COLBOOLEAN") + .getCurrentMaxIntValue()); + Assert.assertEquals( + 1, + result + .getColumnEps() + .get(enableIcebergStreaming ? "1" : "COLBOOLEAN") + .getCurrentNullCount()); } @Test @@ -1517,14 +1635,30 @@ private void testE2EBinaryHelper(OpenChannelRequest.OnErrorOption onErrorOption) ChannelData result = innerBuffer.flush(); Assert.assertEquals(3, result.getRowCount()); - Assert.assertEquals(11L, result.getColumnEps().get("COLBINARY").getCurrentMaxLength()); + Assert.assertEquals( + 11L, + result + .getColumnEps() + .get(enableIcebergStreaming ? "1" : "COLBINARY") + .getCurrentMaxLength()); Assert.assertArrayEquals( "Hello World".getBytes(StandardCharsets.UTF_8), - result.getColumnEps().get("COLBINARY").getCurrentMinStrValue()); + result + .getColumnEps() + .get(enableIcebergStreaming ? "1" : "COLBINARY") + .getCurrentMinStrValue()); Assert.assertArrayEquals( "Honk Honk".getBytes(StandardCharsets.UTF_8), - result.getColumnEps().get("COLBINARY").getCurrentMaxStrValue()); - Assert.assertEquals(1, result.getColumnEps().get("COLBINARY").getCurrentNullCount()); + result + .getColumnEps() + .get(enableIcebergStreaming ? "1" : "COLBINARY") + .getCurrentMaxStrValue()); + Assert.assertEquals( + 1, + result + .getColumnEps() + .get(enableIcebergStreaming ? "1" : "COLBINARY") + .getCurrentNullCount()); } @Test @@ -1570,10 +1704,20 @@ private void testE2ERealHelper(OpenChannelRequest.OnErrorOption onErrorOption) { Assert.assertEquals(3, result.getRowCount()); Assert.assertEquals( - Double.valueOf(123.456), result.getColumnEps().get("COLREAL").getCurrentMinRealValue()); + Double.valueOf(123.456), + result + .getColumnEps() + .get(enableIcebergStreaming ? "1" : "COLREAL") + .getCurrentMinRealValue()); Assert.assertEquals( - Double.valueOf(123.4567), result.getColumnEps().get("COLREAL").getCurrentMaxRealValue()); - Assert.assertEquals(1, result.getColumnEps().get("COLREAL").getCurrentNullCount()); + Double.valueOf(123.4567), + result + .getColumnEps() + .get(enableIcebergStreaming ? "1" : "COLREAL") + .getCurrentMaxRealValue()); + Assert.assertEquals( + 1, + result.getColumnEps().get(enableIcebergStreaming ? "1" : "COLREAL").getCurrentNullCount()); } @Test @@ -1600,11 +1744,29 @@ public void testOnErrorAbortFailures() { Assert.assertEquals(1, innerBuffer.bufferedRowCount); Assert.assertEquals(0, innerBuffer.getTempRowCount()); Assert.assertEquals( - 1, innerBuffer.statsMap.get("COLDECIMAL").getCurrentMaxIntValue().intValue()); + 1, + innerBuffer + .statsMap + .get(enableIcebergStreaming ? "1" : "COLDECIMAL") + .getCurrentMaxIntValue() + .intValue()); Assert.assertEquals( - 1, innerBuffer.statsMap.get("COLDECIMAL").getCurrentMinIntValue().intValue()); - Assert.assertNull(innerBuffer.tempStatsMap.get("COLDECIMAL").getCurrentMaxIntValue()); - Assert.assertNull(innerBuffer.tempStatsMap.get("COLDECIMAL").getCurrentMinIntValue()); + 1, + innerBuffer + .statsMap + .get(enableIcebergStreaming ? "1" : "COLDECIMAL") + .getCurrentMinIntValue() + .intValue()); + Assert.assertNull( + innerBuffer + .tempStatsMap + .get(enableIcebergStreaming ? "1" : "COLDECIMAL") + .getCurrentMaxIntValue()); + Assert.assertNull( + innerBuffer + .tempStatsMap + .get(enableIcebergStreaming ? "1" : "COLDECIMAL") + .getCurrentMinIntValue()); Map row2 = new HashMap<>(); row2.put("COLDECIMAL", 2); @@ -1614,11 +1776,29 @@ public void testOnErrorAbortFailures() { Assert.assertEquals(2, innerBuffer.bufferedRowCount); Assert.assertEquals(0, innerBuffer.getTempRowCount()); Assert.assertEquals( - 2, innerBuffer.statsMap.get("COLDECIMAL").getCurrentMaxIntValue().intValue()); + 2, + innerBuffer + .statsMap + .get(enableIcebergStreaming ? "1" : "COLDECIMAL") + .getCurrentMaxIntValue() + .intValue()); Assert.assertEquals( - 1, innerBuffer.statsMap.get("COLDECIMAL").getCurrentMinIntValue().intValue()); - Assert.assertNull(innerBuffer.tempStatsMap.get("COLDECIMAL").getCurrentMaxIntValue()); - Assert.assertNull(innerBuffer.tempStatsMap.get("COLDECIMAL").getCurrentMinIntValue()); + 1, + innerBuffer + .statsMap + .get(enableIcebergStreaming ? "1" : "COLDECIMAL") + .getCurrentMinIntValue() + .intValue()); + Assert.assertNull( + innerBuffer + .tempStatsMap + .get(enableIcebergStreaming ? "1" : "COLDECIMAL") + .getCurrentMaxIntValue()); + Assert.assertNull( + innerBuffer + .tempStatsMap + .get(enableIcebergStreaming ? "1" : "COLDECIMAL") + .getCurrentMinIntValue()); Map row3 = new HashMap<>(); row3.put("COLDECIMAL", true); @@ -1631,11 +1811,29 @@ public void testOnErrorAbortFailures() { Assert.assertEquals(2, innerBuffer.bufferedRowCount); Assert.assertEquals(0, innerBuffer.getTempRowCount()); Assert.assertEquals( - 2, innerBuffer.statsMap.get("COLDECIMAL").getCurrentMaxIntValue().intValue()); + 2, + innerBuffer + .statsMap + .get(enableIcebergStreaming ? "1" : "COLDECIMAL") + .getCurrentMaxIntValue() + .intValue()); Assert.assertEquals( - 1, innerBuffer.statsMap.get("COLDECIMAL").getCurrentMinIntValue().intValue()); - Assert.assertNull(innerBuffer.tempStatsMap.get("COLDECIMAL").getCurrentMaxIntValue()); - Assert.assertNull(innerBuffer.tempStatsMap.get("COLDECIMAL").getCurrentMinIntValue()); + 1, + innerBuffer + .statsMap + .get(enableIcebergStreaming ? "1" : "COLDECIMAL") + .getCurrentMinIntValue() + .intValue()); + Assert.assertNull( + innerBuffer + .tempStatsMap + .get(enableIcebergStreaming ? "1" : "COLDECIMAL") + .getCurrentMaxIntValue()); + Assert.assertNull( + innerBuffer + .tempStatsMap + .get(enableIcebergStreaming ? "1" : "COLDECIMAL") + .getCurrentMinIntValue()); row3.put("COLDECIMAL", 3); response = innerBuffer.insertRows(Collections.singletonList(row3), "1", "3"); @@ -1643,11 +1841,29 @@ public void testOnErrorAbortFailures() { Assert.assertEquals(3, innerBuffer.bufferedRowCount); Assert.assertEquals(0, innerBuffer.getTempRowCount()); Assert.assertEquals( - 3, innerBuffer.statsMap.get("COLDECIMAL").getCurrentMaxIntValue().intValue()); + 3, + innerBuffer + .statsMap + .get(enableIcebergStreaming ? "1" : "COLDECIMAL") + .getCurrentMaxIntValue() + .intValue()); Assert.assertEquals( - 1, innerBuffer.statsMap.get("COLDECIMAL").getCurrentMinIntValue().intValue()); - Assert.assertNull(innerBuffer.tempStatsMap.get("COLDECIMAL").getCurrentMaxIntValue()); - Assert.assertNull(innerBuffer.tempStatsMap.get("COLDECIMAL").getCurrentMinIntValue()); + 1, + innerBuffer + .statsMap + .get(enableIcebergStreaming ? "1" : "COLDECIMAL") + .getCurrentMinIntValue() + .intValue()); + Assert.assertNull( + innerBuffer + .tempStatsMap + .get(enableIcebergStreaming ? "1" : "COLDECIMAL") + .getCurrentMaxIntValue()); + Assert.assertNull( + innerBuffer + .tempStatsMap + .get(enableIcebergStreaming ? "1" : "COLDECIMAL") + .getCurrentMinIntValue()); ChannelData data = innerBuffer.flush(); Assert.assertEquals(3, data.getRowCount()); @@ -1679,11 +1895,29 @@ public void testOnErrorAbortSkipBatch() { Assert.assertEquals(1, innerBuffer.bufferedRowCount); Assert.assertEquals(0, innerBuffer.getTempRowCount()); Assert.assertEquals( - 1, innerBuffer.statsMap.get("COLDECIMAL").getCurrentMaxIntValue().intValue()); + 1, + innerBuffer + .statsMap + .get(enableIcebergStreaming ? "1" : "COLDECIMAL") + .getCurrentMaxIntValue() + .intValue()); Assert.assertEquals( - 1, innerBuffer.statsMap.get("COLDECIMAL").getCurrentMinIntValue().intValue()); - Assert.assertNull(innerBuffer.tempStatsMap.get("COLDECIMAL").getCurrentMaxIntValue()); - Assert.assertNull(innerBuffer.tempStatsMap.get("COLDECIMAL").getCurrentMinIntValue()); + 1, + innerBuffer + .statsMap + .get(enableIcebergStreaming ? "1" : "COLDECIMAL") + .getCurrentMinIntValue() + .intValue()); + Assert.assertNull( + innerBuffer + .tempStatsMap + .get(enableIcebergStreaming ? "1" : "COLDECIMAL") + .getCurrentMaxIntValue()); + Assert.assertNull( + innerBuffer + .tempStatsMap + .get(enableIcebergStreaming ? "1" : "COLDECIMAL") + .getCurrentMinIntValue()); Map row2 = new HashMap<>(); row2.put("COLDECIMAL", 2); @@ -1696,20 +1930,56 @@ public void testOnErrorAbortSkipBatch() { Assert.assertEquals(1, innerBuffer.bufferedRowCount); Assert.assertEquals(0, innerBuffer.getTempRowCount()); Assert.assertEquals( - 1, innerBuffer.statsMap.get("COLDECIMAL").getCurrentMaxIntValue().intValue()); + 1, + innerBuffer + .statsMap + .get(enableIcebergStreaming ? "1" : "COLDECIMAL") + .getCurrentMaxIntValue() + .intValue()); Assert.assertEquals( - 1, innerBuffer.statsMap.get("COLDECIMAL").getCurrentMinIntValue().intValue()); - Assert.assertNull(innerBuffer.tempStatsMap.get("COLDECIMAL").getCurrentMaxIntValue()); - Assert.assertNull(innerBuffer.tempStatsMap.get("COLDECIMAL").getCurrentMinIntValue()); + 1, + innerBuffer + .statsMap + .get(enableIcebergStreaming ? "1" : "COLDECIMAL") + .getCurrentMinIntValue() + .intValue()); + Assert.assertNull( + innerBuffer + .tempStatsMap + .get(enableIcebergStreaming ? "1" : "COLDECIMAL") + .getCurrentMaxIntValue()); + Assert.assertNull( + innerBuffer + .tempStatsMap + .get(enableIcebergStreaming ? "1" : "COLDECIMAL") + .getCurrentMinIntValue()); Assert.assertEquals(1, innerBuffer.bufferedRowCount); Assert.assertEquals(0, innerBuffer.getTempRowCount()); Assert.assertEquals( - 1, innerBuffer.statsMap.get("COLDECIMAL").getCurrentMaxIntValue().intValue()); + 1, + innerBuffer + .statsMap + .get(enableIcebergStreaming ? "1" : "COLDECIMAL") + .getCurrentMaxIntValue() + .intValue()); Assert.assertEquals( - 1, innerBuffer.statsMap.get("COLDECIMAL").getCurrentMinIntValue().intValue()); - Assert.assertNull(innerBuffer.tempStatsMap.get("COLDECIMAL").getCurrentMaxIntValue()); - Assert.assertNull(innerBuffer.tempStatsMap.get("COLDECIMAL").getCurrentMinIntValue()); + 1, + innerBuffer + .statsMap + .get(enableIcebergStreaming ? "1" : "COLDECIMAL") + .getCurrentMinIntValue() + .intValue()); + Assert.assertNull( + innerBuffer + .tempStatsMap + .get(enableIcebergStreaming ? "1" : "COLDECIMAL") + .getCurrentMaxIntValue()); + Assert.assertNull( + innerBuffer + .tempStatsMap + .get(enableIcebergStreaming ? "1" : "COLDECIMAL") + .getCurrentMinIntValue()); row3.put("COLDECIMAL", 3); response = innerBuffer.insertRows(Arrays.asList(row2, row3), "1", "3"); @@ -1717,11 +1987,29 @@ public void testOnErrorAbortSkipBatch() { Assert.assertEquals(3, innerBuffer.bufferedRowCount); Assert.assertEquals(0, innerBuffer.getTempRowCount()); Assert.assertEquals( - 3, innerBuffer.statsMap.get("COLDECIMAL").getCurrentMaxIntValue().intValue()); + 3, + innerBuffer + .statsMap + .get(enableIcebergStreaming ? "1" : "COLDECIMAL") + .getCurrentMaxIntValue() + .intValue()); Assert.assertEquals( - 1, innerBuffer.statsMap.get("COLDECIMAL").getCurrentMinIntValue().intValue()); - Assert.assertNull(innerBuffer.tempStatsMap.get("COLDECIMAL").getCurrentMaxIntValue()); - Assert.assertNull(innerBuffer.tempStatsMap.get("COLDECIMAL").getCurrentMinIntValue()); + 1, + innerBuffer + .statsMap + .get(enableIcebergStreaming ? "1" : "COLDECIMAL") + .getCurrentMinIntValue() + .intValue()); + Assert.assertNull( + innerBuffer + .tempStatsMap + .get(enableIcebergStreaming ? "1" : "COLDECIMAL") + .getCurrentMaxIntValue()); + Assert.assertNull( + innerBuffer + .tempStatsMap + .get(enableIcebergStreaming ? "1" : "COLDECIMAL") + .getCurrentMinIntValue()); ChannelData data = innerBuffer.flush(); Assert.assertEquals(3, data.getRowCount()); @@ -2121,45 +2409,111 @@ private void testStructuredStatsE2EHelper(AbstractRowBuffer rowBuffer) { ChannelData result = rowBuffer.flush(); Map columnEpStats = result.getColumnEps(); - assertThat(columnEpStats.get("COLOBJECT.a").getCurrentMinIntValue()) + assertThat( + columnEpStats.get(enableIcebergStreaming ? "4" : "COLOBJECT.a").getCurrentMinIntValue()) .isEqualTo(BigInteger.valueOf(1)); - assertThat(columnEpStats.get("COLOBJECT.a").getCurrentMaxIntValue()) + assertThat( + columnEpStats.get(enableIcebergStreaming ? "4" : "COLOBJECT.a").getCurrentMaxIntValue()) .isEqualTo(BigInteger.valueOf(2)); - assertThat(columnEpStats.get("COLOBJECT.a").getCurrentNullCount()).isEqualTo(0); - assertThat(columnEpStats.get("COLOBJECT.a").getDistinctValues()).isEqualTo(2); - assertThat(columnEpStats.get("COLOBJECT.a").getNumberOfValues()).isEqualTo(EP_NV_UNKNOWN); - - assertThat(columnEpStats.get("COLOBJECT.b").getCurrentMinStrValue()) + assertThat( + columnEpStats.get(enableIcebergStreaming ? "4" : "COLOBJECT.a").getCurrentNullCount()) + .isEqualTo(0); + assertThat(columnEpStats.get(enableIcebergStreaming ? "4" : "COLOBJECT.a").getDistinctValues()) + .isEqualTo(2); + assertThat(columnEpStats.get(enableIcebergStreaming ? "4" : "COLOBJECT.a").getNumberOfValues()) + .isEqualTo(EP_NV_UNKNOWN); + + assertThat( + columnEpStats.get(enableIcebergStreaming ? "5" : "COLOBJECT.b").getCurrentMinStrValue()) .isEqualTo("string1".getBytes(StandardCharsets.UTF_8)); - assertThat(columnEpStats.get("COLOBJECT.b").getCurrentMaxStrValue()) + assertThat( + columnEpStats.get(enableIcebergStreaming ? "5" : "COLOBJECT.b").getCurrentMaxStrValue()) .isEqualTo("string1".getBytes(StandardCharsets.UTF_8)); - assertThat(columnEpStats.get("COLOBJECT.b").getCurrentNullCount()).isEqualTo(1); - assertThat(columnEpStats.get("COLOBJECT.b").getDistinctValues()).isEqualTo(1); - assertThat(columnEpStats.get("COLOBJECT.b").getNumberOfValues()).isEqualTo(EP_NV_UNKNOWN); - - assertThat(columnEpStats.get("COLMAP.key_value.key").getCurrentMinStrValue()) + assertThat( + columnEpStats.get(enableIcebergStreaming ? "5" : "COLOBJECT.b").getCurrentNullCount()) + .isEqualTo(1); + assertThat(columnEpStats.get(enableIcebergStreaming ? "5" : "COLOBJECT.b").getDistinctValues()) + .isEqualTo(1); + assertThat(columnEpStats.get(enableIcebergStreaming ? "5" : "COLOBJECT.b").getNumberOfValues()) + .isEqualTo(EP_NV_UNKNOWN); + + assertThat( + columnEpStats + .get(enableIcebergStreaming ? "6" : "COLMAP.key_value.key") + .getCurrentMinStrValue()) .isEqualTo("key1".getBytes(StandardCharsets.UTF_8)); - assertThat(columnEpStats.get("COLMAP.key_value.key").getCurrentMaxStrValue()) + assertThat( + columnEpStats + .get(enableIcebergStreaming ? "6" : "COLMAP.key_value.key") + .getCurrentMaxStrValue()) .isEqualTo("key2".getBytes(StandardCharsets.UTF_8)); - assertThat(columnEpStats.get("COLMAP.key_value.key").getCurrentNullCount()).isEqualTo(1); - assertThat(columnEpStats.get("COLMAP.key_value.key").getDistinctValues()).isEqualTo(2); - assertThat(columnEpStats.get("COLMAP.key_value.key").getNumberOfValues()).isEqualTo(3); - - assertThat(columnEpStats.get("COLMAP.key_value.value").getCurrentMinIntValue()) + assertThat( + columnEpStats + .get(enableIcebergStreaming ? "6" : "COLMAP.key_value.key") + .getCurrentNullCount()) + .isEqualTo(1); + assertThat( + columnEpStats + .get(enableIcebergStreaming ? "6" : "COLMAP.key_value.key") + .getDistinctValues()) + .isEqualTo(2); + assertThat( + columnEpStats + .get(enableIcebergStreaming ? "6" : "COLMAP.key_value.key") + .getNumberOfValues()) + .isEqualTo(3); + + assertThat( + columnEpStats + .get(enableIcebergStreaming ? "7" : "COLMAP.key_value.value") + .getCurrentMinIntValue()) .isEqualTo(BigInteger.ONE); - assertThat(columnEpStats.get("COLMAP.key_value.value").getCurrentMaxIntValue()) + assertThat( + columnEpStats + .get(enableIcebergStreaming ? "7" : "COLMAP.key_value.value") + .getCurrentMaxIntValue()) .isEqualTo(BigInteger.ONE); - assertThat(columnEpStats.get("COLMAP.key_value.value").getCurrentNullCount()).isEqualTo(1); - assertThat(columnEpStats.get("COLMAP.key_value.value").getDistinctValues()).isEqualTo(1); - assertThat(columnEpStats.get("COLMAP.key_value.value").getNumberOfValues()).isEqualTo(3); - - assertThat(columnEpStats.get("COLARRAY.list.element").getCurrentMinIntValue()) + assertThat( + columnEpStats + .get(enableIcebergStreaming ? "7" : "COLMAP.key_value.value") + .getCurrentNullCount()) + .isEqualTo(1); + assertThat( + columnEpStats + .get(enableIcebergStreaming ? "7" : "COLMAP.key_value.value") + .getDistinctValues()) + .isEqualTo(1); + assertThat( + columnEpStats + .get(enableIcebergStreaming ? "7" : "COLMAP.key_value.value") + .getNumberOfValues()) + .isEqualTo(3); + + assertThat( + columnEpStats + .get(enableIcebergStreaming ? "8" : "COLARRAY.list.element") + .getCurrentMinIntValue()) .isEqualTo(BigInteger.ONE); - assertThat(columnEpStats.get("COLARRAY.list.element").getCurrentMaxIntValue()) + assertThat( + columnEpStats + .get(enableIcebergStreaming ? "8" : "COLARRAY.list.element") + .getCurrentMaxIntValue()) .isEqualTo(BigInteger.ONE); - assertThat(columnEpStats.get("COLARRAY.list.element").getCurrentNullCount()).isEqualTo(1); - assertThat(columnEpStats.get("COLARRAY.list.element").getDistinctValues()).isEqualTo(1); - assertThat(columnEpStats.get("COLARRAY.list.element").getNumberOfValues()).isEqualTo(5); + assertThat( + columnEpStats + .get(enableIcebergStreaming ? "8" : "COLARRAY.list.element") + .getCurrentNullCount()) + .isEqualTo(1); + assertThat( + columnEpStats + .get(enableIcebergStreaming ? "8" : "COLARRAY.list.element") + .getDistinctValues()) + .isEqualTo(1); + assertThat( + columnEpStats + .get(enableIcebergStreaming ? "8" : "COLARRAY.list.element") + .getNumberOfValues()) + .isEqualTo(5); } private static Thread getThreadThatWaitsForLockReleaseAndFlushes( diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java index 55a749885..de1a7051c 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java @@ -7,7 +7,9 @@ import static net.snowflake.ingest.utils.Constants.ROLE; import static net.snowflake.ingest.utils.ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM; +import com.fasterxml.jackson.core.json.JsonReadFeature; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.json.JsonMapper; import java.math.BigDecimal; import java.math.BigInteger; import java.sql.Connection; @@ -68,7 +70,8 @@ public abstract class AbstractDataTypeTest { private String schemaName = "PUBLIC"; private SnowflakeStreamingIngestClient client; - protected static final ObjectMapper objectMapper = new ObjectMapper(); + protected static final ObjectMapper objectMapper = + JsonMapper.builder().enable(JsonReadFeature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER).build(); @Parameters(name = "{index}: {0}") public static Object[] parameters() { @@ -172,8 +175,7 @@ protected String createTable(String dataType) throws SQLException { protected String createIcebergTable(String dataType) throws SQLException { String tableName = getRandomIdentifier(); - String baseLocation = - String.format("SDK_IT/%s/%s/%s", databaseName, dataType.replace(" ", "_"), tableName); + String baseLocation = String.format("SDK_IT/%s/%s", databaseName, tableName); conn.createStatement() .execute( String.format( @@ -580,6 +582,9 @@ protected void testIcebergIngestAndQuery( .usingComparatorForType(BigDecimal::compareTo, BigDecimal.class) .usingRecursiveComparison() .isEqualTo(expectedValue); + } else if (expectedValue instanceof Map) { + Assertions.assertThat(objectMapper.readTree((String) res)) + .isEqualTo(objectMapper.valueToTree(expectedValue)); } else if (expectedValue instanceof Timestamp) { Assertions.assertThat(res.toString()).isEqualTo(expectedValue.toString()); } else { diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergDateTimeIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergDateTimeIT.java index a9606cf53..966cc5005 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergDateTimeIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergDateTimeIT.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal.datatypes; import java.sql.Date; diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStructuredIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStructuredIT.java index 73e3700f1..7d8b14d6a 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStructuredIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStructuredIT.java @@ -58,9 +58,7 @@ public void testStructuredDataType() throws Exception { assertStructuredDataType("array(string)", null); /* Map with null key */ - SFException ex = - Assertions.catchThrowableOfType( - SFException.class, + Assertions.assertThatThrownBy( () -> assertMap( "map(string, int)", @@ -68,19 +66,17 @@ public void testStructuredDataType() throws Exception { { put(null, 1); } - })); - Assertions.assertThat(ex) - .extracting(SFException::getVendorCode) + })) + .isInstanceOf(SFException.class) + .extracting("vendorCode") .isEqualTo(ErrorCode.INVALID_FORMAT_ROW.getMessageCode()); /* Unknown field */ - ex = - Assertions.catchThrowableOfType( - SFException.class, + Assertions.assertThatThrownBy( () -> - assertStructuredDataType("object(a int, b string)", "{\"a\": 1, \"c\": \"test\"}")); - Assertions.assertThat(ex) - .extracting(SFException::getVendorCode) + assertStructuredDataType("object(a int, b string)", "{\"a\": 1, \"c\": \"test\"}")) + .isInstanceOf(SFException.class) + .extracting("vendorCode") .isEqualTo(ErrorCode.INVALID_FORMAT_ROW.getMessageCode()); /* Null struct, map list. */ @@ -123,6 +119,105 @@ public void testNestedDataType() throws Exception { "{\"a\": 1, \"b\": [1, 2, 3], \"c\": {\"key1\": 1}}"); } + @Test + public void testFieldName() throws Exception { + Iterable val = + (Iterable) + objectMapper.readValue( + "[" + + "{\"test\": 1, \"TEST\": 2, \"TeSt\": 3}," + + "{\"test\": 4, \"TEST\": 5, \"TeSt\": 6}," + + "{\"test\": 7, \"TEST\": 8, \"TeSt\": 9}" + + "]", + Object.class); + testIcebergIngestAndQuery( + "object(test int, TEST int, TeSt int)", val, "select {columnName} from {tableName}", val); + + /* Single row test, check EP info */ + objectMapper.readValue("[{\"test\\.test\": 1, \"TEST\": 2, \"TeSt\": 3}]", Object.class); + val = + (Iterable) + objectMapper.readValue( + "[{\"obj\\.obj\": false, \"test_test\": 1, \"test_x5Ftest\": 2, \"obj\\\\.obj\":" + + " 3.0, \"❄️\": 4.0, \"5566\": \"5.0\", \"_5566\": \"6.0\", \"_\":" + + " \"41424344454647484950\", \"_x27_x44_xFE_x0F\": \"41424344\", \"\\\"\":" + + " \"2024-01-01\", \"\\\\\": \"12:00:00\", \"\":" + + " \"2024-01-01T12:00:00.000000\", \"временнаяметка समयमोहर 時間戳記 ㄕˊㄔㄨㄛ 타임스탬프" + + " タイムスタンプ tidsstämpel\": \"2024-01-01T12:00:00.000000+08:00\", \"\\.\":" + + " {\"key1\": 1}, \"\\\\.\": [1, 2, 3], \"obj\": {\"obj\": true}}]", + Object.class); + testIcebergIngestAndQuery( + "object(" + + "\"obj.obj\" boolean, " + + "test_test int, " + + "test_x5Ftest long, " + + "\"obj\\.obj\" float, " + + "\"❄️\" double, " + + "\"5566\" string, " + + "_5566 string, " + + "\"_\" fixed(10), " + + "_x27_x44_xFE_x0F binary, " + + "\"\"\"\" date, " + + "\"\\\" string, " + + "\"\" string, " + + "\"временнаяметка समयमोहर 時間戳記 ㄕˊㄔㄨㄛ 타임스탬프 タイムスタンプ tidsstämpel\" string, " + + "\".\" map(string, int)," + + "\"\\.\" array(int)," + + "obj object(obj boolean))", + val, + "select {columnName} from {tableName}", + val); + + /* Multiple rows test, check parquet file */ + val = + (Iterable) + objectMapper.readValue( + "[{\"obj\\.obj\": false, \"test_test\": 1, \"test_x5Ftest\": 2, \"obj\\\\.obj\":" + + " 3.0, \"❄️\": 4.0, \"5566\": \"5.0\", \"_5566\": \"6.0\", \"_\":" + + " \"41424344454647484950\", \"_x27_x44_xFE_x0F\": \"41424344\", \"\\\"\":" + + " \"2024-01-01\", \"\\\\\": \"12:00:00\", \"\":" + + " \"2024-01-01T12:00:00.000000\", \"временнаяметка समयमोहर 時間戳記 ㄕˊㄔㄨㄛ 타임스탬프" + + " タイムスタンプ tidsstämpel\": \"2024-01-01T12:00:00.000000+08:00\", \"\\.\":" + + " {\"key1\": 1}, \"\\\\.\": [1, 2, 3], \"obj\": {\"obj\":" + + " true}},{\"obj\\.obj\": true, \"test_test\": 2, \"test_x5Ftest\": 3," + + " \"obj\\\\.obj\": 4.0, \"❄️\": 5.0, \"5566\": \"6.0\", \"_5566\": \"7.0\"," + + " \"_\": \"51525354555657585960\", \"_x27_x44_xFE_x0F\": \"51525354\"," + + " \"\\\"\": \"2024-01-02\", \"\\\\\": \"13:00:00\", \"\":" + + " \"2024-01-02T13:00:00.000000\", \"временнаяметка समयमोहर 時間戳記 ㄕˊㄔㄨㄛ 타임스탬프" + + " タイムスタンプ tidsstämpel\": \"2024-01-02T13:00:00.000000+08:00\", \"\\.\":" + + " {\"key2\": 2}, \"\\\\.\": [4, 5, 6], \"obj\": {\"obj\":" + + " false}},{\"obj\\.obj\": false, \"test_test\": 3, \"test_x5Ftest\": 4," + + " \"obj\\\\.obj\": 5.0, \"❄️\": 6.0, \"5566\": \"7.0\", \"_5566\": \"8.0\"," + + " \"_\": \"61626364656667686970\", \"_x27_x44_xFE_x0F\": \"61626364\"," + + " \"\\\"\": \"2024-01-03\", \"\\\\\": \"14:00:00\", \"\":" + + " \"2024-01-03T14:00:00.000000\", \"временнаяметка समयमोहर 時間戳記 ㄕˊㄔㄨㄛ 타임스탬프" + + " タイムスタンプ tidsstämpel\": \"2024-01-03T14:00:00.000000+08:00\", \"\\.\":" + + " {\"key3\": 3}, \"\\\\.\": [7, 8, 9], \"obj\": {\"obj\": true}}]", + Object.class); + + testIcebergIngestAndQuery( + "object(" + + "\"obj.obj\" boolean, " + + "test_test int, " + + "test_x5Ftest long, " + + "\"obj\\.obj\" float, " + + "\"❄️\" double, " + + "\"5566\" string, " + + "_5566 string, " + + "\"_\" fixed(10), " + + "_x27_x44_xFE_x0F binary, " + + "\"\"\"\" date, " + + "\"\\\" string, " + + "\"\" string, " + + "\"временнаяметка समयमोहर 時間戳記 ㄕˊㄔㄨㄛ 타임스탬프 タイムスタンプ tidsstämpel\" string, " + + "\".\" map(string, int)," + + "\"\\.\" array(int)," + + "obj object(obj boolean))", + val, + "select {columnName} from {tableName}", + val); + } + private void assertStructuredDataType(String dataType, String value) throws Exception { String tableName = createIcebergTable(dataType); String offsetToken = UUID.randomUUID().toString(); diff --git a/src/test/java/net/snowflake/ingest/utils/SubColumnFinderTest.java b/src/test/java/net/snowflake/ingest/utils/SubColumnFinderTest.java index b5c538d00..90c95a14a 100644 --- a/src/test/java/net/snowflake/ingest/utils/SubColumnFinderTest.java +++ b/src/test/java/net/snowflake/ingest/utils/SubColumnFinderTest.java @@ -4,16 +4,17 @@ package net.snowflake.ingest.utils; -import static net.snowflake.ingest.utils.Utils.concatDotPath; import static org.assertj.core.api.Assertions.assertThat; +import java.util.ArrayList; +import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.stream.Collectors; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.MessageTypeParser; +import org.apache.parquet.schema.Type; import org.junit.Test; public class SubColumnFinderTest { @@ -102,39 +103,46 @@ public void testNestedSchema() { private void assertFindSubColumns(MessageType schema) { SubColumnFinder subColumnFinder = new SubColumnFinder(schema); - for (String dotPath : getAllPossibleDotPath(schema)) { - assertThat(subColumnFinder.getSubColumns(dotPath)) + for (String id : getAllPossibleFieldId(schema)) { + assertThat(subColumnFinder.getSubColumns(id)) .usingRecursiveComparison() .ignoringCollectionOrder() - .isEqualTo(findSubColumn(schema, dotPath)); + .isEqualTo(findSubColumn(schema, id, false)); } } - private Iterable getAllPossibleDotPath(MessageType schema) { - Set dotPaths = new HashSet<>(); + private Iterable getAllPossibleFieldId(MessageType schema) { + Set ids = new HashSet<>(); for (ColumnDescriptor column : schema.getColumns()) { String[] path = column.getPath(); if (path.length == 0) { continue; } - String dotPath = path[0]; - dotPaths.add(dotPath); for (int i = 1; i < path.length; i++) { - dotPath = concatDotPath(dotPath, path[i]); - dotPaths.add(dotPath); + Type type = schema.getType(Arrays.copyOfRange(path, 0, i)); + if (type.getId() != null) { + ids.add(type.getId().toString()); + } } } - return dotPaths; + return ids; } - private List findSubColumn(MessageType schema, String dotPath) { - return schema.getColumns().stream() - .map(ColumnDescriptor::getPath) - .map(Utils::concatDotPath) - .filter( - s -> - s.startsWith(dotPath) - && (s.length() == dotPath.length() || s.charAt(dotPath.length()) == '.')) - .collect(Collectors.toList()); + private List findSubColumn(Type node, String id, boolean isDescendant) { + if (node.getId() != null && node.getId().toString().equals(id)) { + isDescendant = true; + } + if (node.isPrimitive()) { + if (isDescendant) { + return Arrays.asList(node.getId().toString()); + } + return new ArrayList<>(); + } + + List subColumn = new ArrayList<>(); + for (Type child : node.asGroupType().getFields()) { + subColumn.addAll(findSubColumn(child, id, isDescendant)); + } + return subColumn; } }