diff --git a/src/main/java/net/snowflake/ingest/utils/IcebergDataTypeParser.java b/src/main/java/net/snowflake/ingest/utils/IcebergDataTypeParser.java index 752fccb83..b2eb03cda 100644 --- a/src/main/java/net/snowflake/ingest/utils/IcebergDataTypeParser.java +++ b/src/main/java/net/snowflake/ingest/utils/IcebergDataTypeParser.java @@ -44,6 +44,8 @@ public class IcebergDataTypeParser { private static final String ELEMENT_REQUIRED = "element-required"; private static final String VALUE_REQUIRED = "value-required"; + private static final String EMPTY_FIELD_CHAR = "\\"; + /** Object mapper for this class */ private static final ObjectMapper MAPPER = new ObjectMapper(); @@ -88,7 +90,7 @@ public static org.apache.parquet.schema.Type parseIcebergDataTypeStringToParquet name, icebergDataType)); } } - return decodeAvroFieldName(parquetType); + return replaceWithOriginalFieldName(parquetType, icebergType, name); } /** @@ -161,13 +163,13 @@ public static Type getTypeFromJson(@Nonnull JsonNode jsonNode) { int id = JsonUtil.getInt(ID, field); - /* - * Encoded the underscore in the field name to avoid the field name duplication after Avro - * schema sanitization in TypeToMessageType. See AvroSchemaUtil#sanitize for more details. - */ + /* TypeToMessageType throws on empty field name, use a backslash to represent it and escape remaining backslash. */ String name = JsonUtil.getString(NAME, field) - .replace("_", "_x" + Integer.toHexString('_').toUpperCase()); + .replace(EMPTY_FIELD_CHAR, EMPTY_FIELD_CHAR + EMPTY_FIELD_CHAR); + if (name.isEmpty()) { + name = EMPTY_FIELD_CHAR; + } Type type = getTypeFromJson(field.get(TYPE)); String doc = JsonUtil.getStringOrNull(DOC, field); @@ -222,38 +224,60 @@ public static Types.MapType mapFromJson(JsonNode json) { } } - private static org.apache.parquet.schema.Type decodeAvroFieldName( - org.apache.parquet.schema.Type type) { - StringBuilder sb = new StringBuilder(); - String name = type.getName(); - for (int i = 0; i < name.length(); i++) { - char c = name.charAt(i); - if (c == '_' && i + 1 < name.length() && name.charAt(i + 1) == 'x') { - sb.append((char) Integer.parseInt(name.substring(i + 2, i + 4), 16)); - i += 3; - } else { - sb.append(c); - } + private static org.apache.parquet.schema.Type replaceWithOriginalFieldName( + org.apache.parquet.schema.Type parquetType, Type icebergType, String fieldName) { + if (parquetType.isPrimitive() != icebergType.isPrimitiveType() + || (!parquetType.isPrimitive() + && parquetType.getLogicalTypeAnnotation() + == null /* ignore outer layer of map or list */ + && parquetType.asGroupType().getFieldCount() + != icebergType.asNestedType().fields().size())) { + throw new IllegalArgumentException( + String.format( + "Parquet type and Iceberg type mismatch: %s, %s", parquetType, icebergType)); } - - if (type.isPrimitive()) { + if (parquetType.isPrimitive()) { /* rename field name */ return org.apache.parquet.schema.Types.primitive( - type.asPrimitiveType().getPrimitiveTypeName(), type.getRepetition()) - .as(type.asPrimitiveType().getLogicalTypeAnnotation()) - .id(type.getId().intValue()) - .length(type.asPrimitiveType().getTypeLength()) - .named(sb.toString()); + parquetType.asPrimitiveType().getPrimitiveTypeName(), parquetType.getRepetition()) + .as(parquetType.asPrimitiveType().getLogicalTypeAnnotation()) + .id(parquetType.getId().intValue()) + .length(parquetType.asPrimitiveType().getTypeLength()) + .named(fieldName); } else { org.apache.parquet.schema.Types.GroupBuilder builder = - org.apache.parquet.schema.Types.buildGroup(type.getRepetition()); - for (org.apache.parquet.schema.Type fieldType : type.asGroupType().getFields()) { - builder.addField(decodeAvroFieldName(fieldType)); + org.apache.parquet.schema.Types.buildGroup(parquetType.getRepetition()); + for (org.apache.parquet.schema.Type parquetFieldType : + parquetType.asGroupType().getFields()) { + if (parquetFieldType.getId() == null) { + /* middle layer of map or list. Skip this level as parquet's using 3-level list/map while iceberg's using 2-level list/map */ + builder.addField( + replaceWithOriginalFieldName( + parquetFieldType, icebergType, parquetFieldType.getName())); + } else { + Types.NestedField icebergField = + icebergType.asNestedType().field(parquetFieldType.getId().intValue()); + if (icebergField == null) { + throw new IllegalArgumentException( + String.format( + "Cannot find Iceberg field with id %d in Iceberg type: %s", + parquetFieldType.getId().intValue(), icebergType)); + } + builder.addField( + replaceWithOriginalFieldName( + parquetFieldType, + icebergField.type(), + icebergField.name().equals(EMPTY_FIELD_CHAR) + ? "" + : icebergField + .name() + .replace(EMPTY_FIELD_CHAR + EMPTY_FIELD_CHAR, EMPTY_FIELD_CHAR))); + } } - if (type.getId() != null) { - builder.id(type.getId().intValue()); + if (parquetType.getId() != null) { + builder.id(parquetType.getId().intValue()); } - return builder.as(type.getLogicalTypeAnnotation()).named(sb.toString()); + return builder.as(parquetType.getLogicalTypeAnnotation()).named(fieldName); } } } diff --git a/src/main/java/net/snowflake/ingest/utils/Utils.java b/src/main/java/net/snowflake/ingest/utils/Utils.java index 9e6e1d323..7fef46cd0 100644 --- a/src/main/java/net/snowflake/ingest/utils/Utils.java +++ b/src/main/java/net/snowflake/ingest/utils/Utils.java @@ -424,8 +424,8 @@ public static String getFullyQualifiedChannelName( public static String concatDotPath(String... path) { StringBuilder sb = new StringBuilder(); for (String p : path) { - if (isNullOrEmpty(p)) { - throw new IllegalArgumentException("Path cannot be null or empty"); + if (p == null) { + throw new IllegalArgumentException("Path cannot be null"); } if (sb.length() > 0) { sb.append("."); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java index baa370e16..c2f34cc6e 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java @@ -7,13 +7,16 @@ import static net.snowflake.ingest.utils.Constants.ROLE; import static net.snowflake.ingest.utils.ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM; +import com.fasterxml.jackson.core.json.JsonReadFeature; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.json.JsonMapper; import java.math.BigDecimal; import java.math.BigInteger; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Timestamp; import java.time.ZoneId; import java.util.HashMap; import java.util.Map; @@ -67,7 +70,8 @@ public abstract class AbstractDataTypeTest { private String schemaName = "PUBLIC"; private SnowflakeStreamingIngestClient client; - protected static final ObjectMapper objectMapper = new ObjectMapper(); + protected static final ObjectMapper objectMapper = + JsonMapper.builder().enable(JsonReadFeature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER).build(); @Parameters(name = "{index}: {0}") public static Object[] parameters() { @@ -563,6 +567,11 @@ protected void testIcebergIngestAndQuery( .usingComparatorForType(BigDecimal::compareTo, BigDecimal.class) .usingRecursiveComparison() .isEqualTo(expectedValue); + } else if (expectedValue instanceof Map) { + Assertions.assertThat(objectMapper.readTree((String) res)) + .isEqualTo(objectMapper.valueToTree(expectedValue)); + } else if (expectedValue instanceof Timestamp) { + Assertions.assertThat(res.toString()).isEqualTo(expectedValue.toString()); } else { Assertions.assertThat(res).isEqualTo(expectedValue); } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergDateTimeIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergDateTimeIT.java index a9606cf53..966cc5005 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergDateTimeIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergDateTimeIT.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal.datatypes; import java.sql.Date; diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStructuredIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStructuredIT.java index 1fb341bf8..405223645 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStructuredIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/IcebergStructuredIT.java @@ -43,8 +43,7 @@ public void before() throws Exception { @Test public void testStructuredDataType() throws Exception { assertStructuredDataType( - "object(a int, A int, \"b.b\" string, b_x2Eb boolean)", - "{\"a\": 1, \"A\": 1, \"b.b\": \"test\", \"b_x2Eb\": true}"); + "object(a int, b string, c boolean)", "{\"a\": 1, \"b\": \"test\", \"c\": true}"); assertStructuredDataType("map(string, int)", "{\"key1\": 1}"); assertStructuredDataType("array(int)", "[1, 2, 3]"); assertMap( @@ -118,6 +117,105 @@ public void testNestedDataType() throws Exception { "{\"a\": 1, \"b\": [1, 2, 3], \"c\": {\"key1\": 1}}"); } + @Test + public void testFieldName() throws Exception { + Iterable val = + (Iterable) + objectMapper.readValue( + "[" + + "{\"test\": 1, \"TEST\": 2, \"TeSt\": 3}," + + "{\"test\": 4, \"TEST\": 5, \"TeSt\": 6}," + + "{\"test\": 7, \"TEST\": 8, \"TeSt\": 9}" + + "]", + Object.class); + testIcebergIngestAndQuery( + "object(test int, TEST int, TeSt int)", val, "select {columnName} from {tableName}", val); + + /* Single row test, check EP info */ + objectMapper.readValue("[{\"test\\.test\": 1, \"TEST\": 2, \"TeSt\": 3}]", Object.class); + val = + (Iterable) + objectMapper.readValue( + "[{\"obj\\.obj\": false, \"test_test\": 1, \"test_x5Ftest\": 2, \"obj\\\\.obj\":" + + " 3.0, \"❄️\": 4.0, \"5566\": \"5.0\", \"_5566\": \"6.0\", \"_\":" + + " \"41424344454647484950\", \"_x27_x44_xFE_x0F\": \"41424344\", \"\\\"\":" + + " \"2024-01-01\", \"\\\\\": \"12:00:00\", \"\":" + + " \"2024-01-01T12:00:00.000000\", \"временнаяметка समयमोहर 時間戳記 ㄕˊㄔㄨㄛ 타임스탬프" + + " タイムスタンプ tidsstämpel\": \"2024-01-01T12:00:00.000000+08:00\", \"\\.\":" + + " {\"key1\": 1}, \"\\\\.\": [1, 2, 3], \"obj\": {\"obj\": true}}]", + Object.class); + testIcebergIngestAndQuery( + "object(" + + "\"obj.obj\" boolean, " + + "test_test int, " + + "test_x5Ftest long, " + + "\"obj\\.obj\" float, " + + "\"❄️\" double, " + + "\"5566\" string, " + + "_5566 string, " + + "\"_\" fixed(10), " + + "_x27_x44_xFE_x0F binary, " + + "\"\"\"\" date, " + + "\"\\\" string, " + + "\"\" string, " + + "\"временнаяметка समयमोहर 時間戳記 ㄕˊㄔㄨㄛ 타임스탬프 タイムスタンプ tidsstämpel\" string, " + + "\".\" map(string, int)," + + "\"\\.\" array(int)," + + "obj object(obj boolean))", + val, + "select {columnName} from {tableName}", + val); + + /* Multiple rows test, check parquet file */ + val = + (Iterable) + objectMapper.readValue( + "[{\"obj\\.obj\": false, \"test_test\": 1, \"test_x5Ftest\": 2, \"obj\\\\.obj\":" + + " 3.0, \"❄️\": 4.0, \"5566\": \"5.0\", \"_5566\": \"6.0\", \"_\":" + + " \"41424344454647484950\", \"_x27_x44_xFE_x0F\": \"41424344\", \"\\\"\":" + + " \"2024-01-01\", \"\\\\\": \"12:00:00\", \"\":" + + " \"2024-01-01T12:00:00.000000\", \"временнаяметка समयमोहर 時間戳記 ㄕˊㄔㄨㄛ 타임스탬프" + + " タイムスタンプ tidsstämpel\": \"2024-01-01T12:00:00.000000+08:00\", \"\\.\":" + + " {\"key1\": 1}, \"\\\\.\": [1, 2, 3], \"obj\": {\"obj\":" + + " true}},{\"obj\\.obj\": true, \"test_test\": 2, \"test_x5Ftest\": 3," + + " \"obj\\\\.obj\": 4.0, \"❄️\": 5.0, \"5566\": \"6.0\", \"_5566\": \"7.0\"," + + " \"_\": \"51525354555657585960\", \"_x27_x44_xFE_x0F\": \"51525354\"," + + " \"\\\"\": \"2024-01-02\", \"\\\\\": \"13:00:00\", \"\":" + + " \"2024-01-02T13:00:00.000000\", \"временнаяметка समयमोहर 時間戳記 ㄕˊㄔㄨㄛ 타임스탬프" + + " タイムスタンプ tidsstämpel\": \"2024-01-02T13:00:00.000000+08:00\", \"\\.\":" + + " {\"key2\": 2}, \"\\\\.\": [4, 5, 6], \"obj\": {\"obj\":" + + " false}},{\"obj\\.obj\": false, \"test_test\": 3, \"test_x5Ftest\": 4," + + " \"obj\\\\.obj\": 5.0, \"❄️\": 6.0, \"5566\": \"7.0\", \"_5566\": \"8.0\"," + + " \"_\": \"61626364656667686970\", \"_x27_x44_xFE_x0F\": \"61626364\"," + + " \"\\\"\": \"2024-01-03\", \"\\\\\": \"14:00:00\", \"\":" + + " \"2024-01-03T14:00:00.000000\", \"временнаяметка समयमोहर 時間戳記 ㄕˊㄔㄨㄛ 타임스탬프" + + " タイムスタンプ tidsstämpel\": \"2024-01-03T14:00:00.000000+08:00\", \"\\.\":" + + " {\"key3\": 3}, \"\\\\.\": [7, 8, 9], \"obj\": {\"obj\": true}}]", + Object.class); + + testIcebergIngestAndQuery( + "object(" + + "\"obj.obj\" boolean, " + + "test_test int, " + + "test_x5Ftest long, " + + "\"obj\\.obj\" float, " + + "\"❄️\" double, " + + "\"5566\" string, " + + "_5566 string, " + + "\"_\" fixed(10), " + + "_x27_x44_xFE_x0F binary, " + + "\"\"\"\" date, " + + "\"\\\" string, " + + "\"\" string, " + + "\"временнаяметка समयमोहर 時間戳記 ㄕˊㄔㄨㄛ 타임스탬프 タイムスタンプ tidsstämpel\" string, " + + "\".\" map(string, int)," + + "\"\\.\" array(int)," + + "obj object(obj boolean))", + val, + "select {columnName} from {tableName}", + val); + } + private void assertStructuredDataType(String dataType, String value) throws Exception { String tableName = createIcebergTable(dataType); String offsetToken = UUID.randomUUID().toString();