Skip to content

Commit

Permalink
fix non unicode & empty field name
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-alhuang committed Oct 23, 2024
1 parent 8a6b9e4 commit 6dc18a1
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 36 deletions.
86 changes: 55 additions & 31 deletions src/main/java/net/snowflake/ingest/utils/IcebergDataTypeParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public class IcebergDataTypeParser {
private static final String ELEMENT_REQUIRED = "element-required";
private static final String VALUE_REQUIRED = "value-required";

private static final String EMPTY_FIELD_CHAR = "\\";

/** Object mapper for this class */
private static final ObjectMapper MAPPER = new ObjectMapper();

Expand Down Expand Up @@ -88,7 +90,7 @@ public static org.apache.parquet.schema.Type parseIcebergDataTypeStringToParquet
name, icebergDataType));
}
}
return decodeAvroFieldName(parquetType);
return replaceWithOriginalFieldName(parquetType, icebergType, name);
}

/**
Expand Down Expand Up @@ -161,13 +163,13 @@ public static Type getTypeFromJson(@Nonnull JsonNode jsonNode) {

int id = JsonUtil.getInt(ID, field);

/*
* Encoded the underscore in the field name to avoid the field name duplication after Avro
* schema sanitization in TypeToMessageType. See AvroSchemaUtil#sanitize for more details.
*/
/* TypeToMessageType throws on empty field name, use a backslash to represent it and escape remaining backslash. */
String name =
JsonUtil.getString(NAME, field)
.replace("_", "_x" + Integer.toHexString('_').toUpperCase());
.replace(EMPTY_FIELD_CHAR, EMPTY_FIELD_CHAR + EMPTY_FIELD_CHAR);
if (name.isEmpty()) {
name = EMPTY_FIELD_CHAR;
}
Type type = getTypeFromJson(field.get(TYPE));

String doc = JsonUtil.getStringOrNull(DOC, field);
Expand Down Expand Up @@ -222,38 +224,60 @@ public static Types.MapType mapFromJson(JsonNode json) {
}
}

private static org.apache.parquet.schema.Type decodeAvroFieldName(
org.apache.parquet.schema.Type type) {
StringBuilder sb = new StringBuilder();
String name = type.getName();
for (int i = 0; i < name.length(); i++) {
char c = name.charAt(i);
if (c == '_' && i + 1 < name.length() && name.charAt(i + 1) == 'x') {
sb.append((char) Integer.parseInt(name.substring(i + 2, i + 4), 16));
i += 3;
} else {
sb.append(c);
}
private static org.apache.parquet.schema.Type replaceWithOriginalFieldName(
org.apache.parquet.schema.Type parquetType, Type icebergType, String fieldName) {
if (parquetType.isPrimitive() != icebergType.isPrimitiveType()
|| (!parquetType.isPrimitive()
&& parquetType.getLogicalTypeAnnotation()
== null /* ignore outer layer of map or list */
&& parquetType.asGroupType().getFieldCount()
!= icebergType.asNestedType().fields().size())) {
throw new IllegalArgumentException(
String.format(
"Parquet type and Iceberg type mismatch: %s, %s", parquetType, icebergType));
}

if (type.isPrimitive()) {
if (parquetType.isPrimitive()) {
/* rename field name */
return org.apache.parquet.schema.Types.primitive(
type.asPrimitiveType().getPrimitiveTypeName(), type.getRepetition())
.as(type.asPrimitiveType().getLogicalTypeAnnotation())
.id(type.getId().intValue())
.length(type.asPrimitiveType().getTypeLength())
.named(sb.toString());
parquetType.asPrimitiveType().getPrimitiveTypeName(), parquetType.getRepetition())
.as(parquetType.asPrimitiveType().getLogicalTypeAnnotation())
.id(parquetType.getId().intValue())
.length(parquetType.asPrimitiveType().getTypeLength())
.named(fieldName);
} else {
org.apache.parquet.schema.Types.GroupBuilder<org.apache.parquet.schema.GroupType> builder =
org.apache.parquet.schema.Types.buildGroup(type.getRepetition());
for (org.apache.parquet.schema.Type fieldType : type.asGroupType().getFields()) {
builder.addField(decodeAvroFieldName(fieldType));
org.apache.parquet.schema.Types.buildGroup(parquetType.getRepetition());
for (org.apache.parquet.schema.Type parquetFieldType :
parquetType.asGroupType().getFields()) {
if (parquetFieldType.getId() == null) {
/* middle layer of map or list. Skip this level as parquet's using 3-level list/map while iceberg's using 2-level list/map */
builder.addField(
replaceWithOriginalFieldName(
parquetFieldType, icebergType, parquetFieldType.getName()));
} else {
Types.NestedField icebergField =
icebergType.asNestedType().field(parquetFieldType.getId().intValue());
if (icebergField == null) {
throw new IllegalArgumentException(
String.format(
"Cannot find Iceberg field with id %d in Iceberg type: %s",
parquetFieldType.getId().intValue(), icebergType));
}
builder.addField(
replaceWithOriginalFieldName(
parquetFieldType,
icebergField.type(),
icebergField.name().equals(EMPTY_FIELD_CHAR)
? ""
: icebergField
.name()
.replace(EMPTY_FIELD_CHAR + EMPTY_FIELD_CHAR, EMPTY_FIELD_CHAR)));
}
}
if (type.getId() != null) {
builder.id(type.getId().intValue());
if (parquetType.getId() != null) {
builder.id(parquetType.getId().intValue());
}
return builder.as(type.getLogicalTypeAnnotation()).named(sb.toString());
return builder.as(parquetType.getLogicalTypeAnnotation()).named(fieldName);
}
}
}
4 changes: 2 additions & 2 deletions src/main/java/net/snowflake/ingest/utils/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -424,8 +424,8 @@ public static String getFullyQualifiedChannelName(
public static String concatDotPath(String... path) {
StringBuilder sb = new StringBuilder();
for (String p : path) {
if (isNullOrEmpty(p)) {
throw new IllegalArgumentException("Path cannot be null or empty");
if (p == null) {
throw new IllegalArgumentException("Path cannot be null");
}
if (sb.length() > 0) {
sb.append(".");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@
import static net.snowflake.ingest.utils.Constants.ROLE;
import static net.snowflake.ingest.utils.ParameterProvider.BDEC_PARQUET_COMPRESSION_ALGORITHM;

import com.fasterxml.jackson.core.json.JsonReadFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.ZoneId;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -67,7 +70,8 @@ public abstract class AbstractDataTypeTest {

private String schemaName = "PUBLIC";
private SnowflakeStreamingIngestClient client;
protected static final ObjectMapper objectMapper = new ObjectMapper();
protected static final ObjectMapper objectMapper =
JsonMapper.builder().enable(JsonReadFeature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER).build();

@Parameters(name = "{index}: {0}")
public static Object[] parameters() {
Expand Down Expand Up @@ -563,6 +567,11 @@ protected void testIcebergIngestAndQuery(
.usingComparatorForType(BigDecimal::compareTo, BigDecimal.class)
.usingRecursiveComparison()
.isEqualTo(expectedValue);
} else if (expectedValue instanceof Map) {
Assertions.assertThat(objectMapper.readTree((String) res))
.isEqualTo(objectMapper.valueToTree(expectedValue));
} else if (expectedValue instanceof Timestamp) {
Assertions.assertThat(res.toString()).isEqualTo(expectedValue.toString());
} else {
Assertions.assertThat(res).isEqualTo(expectedValue);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal.datatypes;

import java.sql.Date;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ public void before() throws Exception {
@Test
public void testStructuredDataType() throws Exception {
assertStructuredDataType(
"object(a int, A int, \"b.b\" string, b_x2Eb boolean)",
"{\"a\": 1, \"A\": 1, \"b.b\": \"test\", \"b_x2Eb\": true}");
"object(a int, b string, c boolean)", "{\"a\": 1, \"b\": \"test\", \"c\": true}");
assertStructuredDataType("map(string, int)", "{\"key1\": 1}");
assertStructuredDataType("array(int)", "[1, 2, 3]");
assertMap(
Expand Down Expand Up @@ -118,6 +117,105 @@ public void testNestedDataType() throws Exception {
"{\"a\": 1, \"b\": [1, 2, 3], \"c\": {\"key1\": 1}}");
}

@Test
public void testFieldName() throws Exception {
Iterable<Object> val =
(Iterable<Object>)
objectMapper.readValue(
"["
+ "{\"test\": 1, \"TEST\": 2, \"TeSt\": 3},"
+ "{\"test\": 4, \"TEST\": 5, \"TeSt\": 6},"
+ "{\"test\": 7, \"TEST\": 8, \"TeSt\": 9}"
+ "]",
Object.class);
testIcebergIngestAndQuery(
"object(test int, TEST int, TeSt int)", val, "select {columnName} from {tableName}", val);

/* Single row test, check EP info */
objectMapper.readValue("[{\"test\\.test\": 1, \"TEST\": 2, \"TeSt\": 3}]", Object.class);
val =
(Iterable<Object>)
objectMapper.readValue(
"[{\"obj\\.obj\": false, \"test_test\": 1, \"test_x5Ftest\": 2, \"obj\\\\.obj\":"
+ " 3.0, \"❄️\": 4.0, \"5566\": \"5.0\", \"_5566\": \"6.0\", \"_\":"
+ " \"41424344454647484950\", \"_x27_x44_xFE_x0F\": \"41424344\", \"\\\"\":"
+ " \"2024-01-01\", \"\\\\\": \"12:00:00\", \"\":"
+ " \"2024-01-01T12:00:00.000000\", \"временнаяметка समयमोहर 時間戳記 ㄕˊㄔㄨㄛ 타임스탬프"
+ " タイムスタンプ tidsstämpel\": \"2024-01-01T12:00:00.000000+08:00\", \"\\.\":"
+ " {\"key1\": 1}, \"\\\\.\": [1, 2, 3], \"obj\": {\"obj\": true}}]",
Object.class);
testIcebergIngestAndQuery(
"object("
+ "\"obj.obj\" boolean, "
+ "test_test int, "
+ "test_x5Ftest long, "
+ "\"obj\\.obj\" float, "
+ "\"❄️\" double, "
+ "\"5566\" string, "
+ "_5566 string, "
+ "\"_\" fixed(10), "
+ "_x27_x44_xFE_x0F binary, "
+ "\"\"\"\" date, "
+ "\"\\\" string, "
+ "\"\" string, "
+ "\"временнаяметка समयमोहर 時間戳記 ㄕˊㄔㄨㄛ 타임스탬프 タイムスタンプ tidsstämpel\" string, "
+ "\".\" map(string, int),"
+ "\"\\.\" array(int),"
+ "obj object(obj boolean))",
val,
"select {columnName} from {tableName}",
val);

/* Multiple rows test, check parquet file */
val =
(Iterable<Object>)
objectMapper.readValue(
"[{\"obj\\.obj\": false, \"test_test\": 1, \"test_x5Ftest\": 2, \"obj\\\\.obj\":"
+ " 3.0, \"❄️\": 4.0, \"5566\": \"5.0\", \"_5566\": \"6.0\", \"_\":"
+ " \"41424344454647484950\", \"_x27_x44_xFE_x0F\": \"41424344\", \"\\\"\":"
+ " \"2024-01-01\", \"\\\\\": \"12:00:00\", \"\":"
+ " \"2024-01-01T12:00:00.000000\", \"временнаяметка समयमोहर 時間戳記 ㄕˊㄔㄨㄛ 타임스탬프"
+ " タイムスタンプ tidsstämpel\": \"2024-01-01T12:00:00.000000+08:00\", \"\\.\":"
+ " {\"key1\": 1}, \"\\\\.\": [1, 2, 3], \"obj\": {\"obj\":"
+ " true}},{\"obj\\.obj\": true, \"test_test\": 2, \"test_x5Ftest\": 3,"
+ " \"obj\\\\.obj\": 4.0, \"❄️\": 5.0, \"5566\": \"6.0\", \"_5566\": \"7.0\","
+ " \"_\": \"51525354555657585960\", \"_x27_x44_xFE_x0F\": \"51525354\","
+ " \"\\\"\": \"2024-01-02\", \"\\\\\": \"13:00:00\", \"\":"
+ " \"2024-01-02T13:00:00.000000\", \"временнаяметка समयमोहर 時間戳記 ㄕˊㄔㄨㄛ 타임스탬프"
+ " タイムスタンプ tidsstämpel\": \"2024-01-02T13:00:00.000000+08:00\", \"\\.\":"
+ " {\"key2\": 2}, \"\\\\.\": [4, 5, 6], \"obj\": {\"obj\":"
+ " false}},{\"obj\\.obj\": false, \"test_test\": 3, \"test_x5Ftest\": 4,"
+ " \"obj\\\\.obj\": 5.0, \"❄️\": 6.0, \"5566\": \"7.0\", \"_5566\": \"8.0\","
+ " \"_\": \"61626364656667686970\", \"_x27_x44_xFE_x0F\": \"61626364\","
+ " \"\\\"\": \"2024-01-03\", \"\\\\\": \"14:00:00\", \"\":"
+ " \"2024-01-03T14:00:00.000000\", \"временнаяметка समयमोहर 時間戳記 ㄕˊㄔㄨㄛ 타임스탬프"
+ " タイムスタンプ tidsstämpel\": \"2024-01-03T14:00:00.000000+08:00\", \"\\.\":"
+ " {\"key3\": 3}, \"\\\\.\": [7, 8, 9], \"obj\": {\"obj\": true}}]",
Object.class);

testIcebergIngestAndQuery(
"object("
+ "\"obj.obj\" boolean, "
+ "test_test int, "
+ "test_x5Ftest long, "
+ "\"obj\\.obj\" float, "
+ "\"❄️\" double, "
+ "\"5566\" string, "
+ "_5566 string, "
+ "\"_\" fixed(10), "
+ "_x27_x44_xFE_x0F binary, "
+ "\"\"\"\" date, "
+ "\"\\\" string, "
+ "\"\" string, "
+ "\"временнаяметка समयमोहर 時間戳記 ㄕˊㄔㄨㄛ 타임스탬프 タイムスタンプ tidsstämpel\" string, "
+ "\".\" map(string, int),"
+ "\"\\.\" array(int),"
+ "obj object(obj boolean))",
val,
"select {columnName} from {tableName}",
val);
}

private void assertStructuredDataType(String dataType, String value) throws Exception {
String tableName = createIcebergTable(dataType);
String offsetToken = UUID.randomUUID().toString();
Expand Down

0 comments on commit 6dc18a1

Please sign in to comment.