From 1f8845ad6d697697dd103719c4c07ee08497e4b3 Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Thu, 7 Nov 2024 12:53:56 -0800 Subject: [PATCH] fix --- .../streaming/internal/ParquetRowBuffer.java | 16 +++- .../ingest/utils/IcebergDataTypeParser.java | 20 +++-- .../datatypes/AbstractDataTypeTest.java | 21 +++-- .../streaming/internal/it/ColumnNamesIT.java | 76 +++++++++++++++---- 4 files changed, 104 insertions(+), 29 deletions(-) diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java index 92fb1755f..f5654d07b 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java @@ -219,14 +219,24 @@ public void setupSchema(List columns) { isInRepeatedGroup = true; } + boolean isPrimitiveColumn = path.length == 1; + /* set fieldId to 0 for non-structured columns */ - int fieldId = path.length == 1 ? 0 : primitiveType.getId().intValue(); + int fieldId = isPrimitiveColumn ? 0 : primitiveType.getId().intValue(); int ordinal = schema.getType(columnDescriptor.getPath()[0]).getId().intValue(); + /** + * For non-structured columns, server side performs EP metadata check by columnsDisplayName, + * set it to the original column name to avoid EP validation error. Structured datatype is + * checked by fieldId and ordinal where columnDisplayName doesn't matter. + */ + String columnDisplayName = + isPrimitiveColumn ? columns.get(ordinal - 1).getName() : columnDotPath; + this.statsMap.put( primitiveType.getId().toString(), new RowBufferStats( - columnDotPath, + columnDisplayName, null /* collationDefinitionString */, ordinal, fieldId, @@ -239,7 +249,7 @@ public void setupSchema(List columns) { this.tempStatsMap.put( primitiveType.getId().toString(), new RowBufferStats( - columnDotPath, + columnDisplayName, null /* collationDefinitionString */, ordinal, fieldId, diff --git a/src/main/java/net/snowflake/ingest/utils/IcebergDataTypeParser.java b/src/main/java/net/snowflake/ingest/utils/IcebergDataTypeParser.java index d8b8fce2d..2afdabdaf 100644 --- a/src/main/java/net/snowflake/ingest/utils/IcebergDataTypeParser.java +++ b/src/main/java/net/snowflake/ingest/utils/IcebergDataTypeParser.java @@ -68,6 +68,7 @@ public static org.apache.parquet.schema.Type parseIcebergDataTypeStringToParquet String name) { Type icebergType = deserializeIcebergType(icebergDataType); org.apache.parquet.schema.Type parquetType; + name = sanitizeFieldName(name); if (icebergType.isPrimitiveType()) { parquetType = typeToMessageType.primitive(icebergType.asPrimitiveType(), repetition, id, name); @@ -163,13 +164,7 @@ public static Type getTypeFromJson(@Nonnull JsonNode jsonNode) { int id = JsonUtil.getInt(ID, field); - /* TypeToMessageType throws on empty field name, use a backslash to represent it and escape remaining backslash. */ - String name = - JsonUtil.getString(NAME, field) - .replace(EMPTY_FIELD_CHAR, EMPTY_FIELD_CHAR + EMPTY_FIELD_CHAR); - if (name.isEmpty()) { - name = EMPTY_FIELD_CHAR; - } + String name = sanitizeFieldName(JsonUtil.getStringOrNull(NAME, field)); Type type = getTypeFromJson(field.get(TYPE)); String doc = JsonUtil.getStringOrNull(DOC, field); @@ -280,7 +275,7 @@ private static org.apache.parquet.schema.Type replaceWithOriginalFieldName( parquetFieldType, icebergField.type(), icebergField.name().equals(EMPTY_FIELD_CHAR) - ? "" /* Empty string are encoded as single backslash in #structFromJson. Decode them here. */ + ? "" /* Empty string are encoded as single backslash in #sanitizeFieldName. Decode them here. */ : icebergField .name() .replace(EMPTY_FIELD_CHAR + EMPTY_FIELD_CHAR, EMPTY_FIELD_CHAR))); @@ -293,4 +288,13 @@ private static org.apache.parquet.schema.Type replaceWithOriginalFieldName( } return builder.as(parquetType.getLogicalTypeAnnotation()).named(fieldName); } + + /* TypeToMessageType throws on empty field name, use a backslash to represent it and escape remaining backslash. */ + private static String sanitizeFieldName(String fieldName) { + String name = fieldName.replace(EMPTY_FIELD_CHAR, EMPTY_FIELD_CHAR + EMPTY_FIELD_CHAR); + if (name.isEmpty()) { + name = EMPTY_FIELD_CHAR; + } + return name; + } } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java index de1a7051c..ba904dcfe 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java @@ -179,15 +179,26 @@ protected String createIcebergTable(String dataType) throws SQLException { conn.createStatement() .execute( String.format( - "create or replace iceberg table %s (%s string, %s %s) " - + "catalog = 'SNOWFLAKE' " - + "external_volume = 'streaming_ingest' " - + "base_location = '%s';", - tableName, SOURCE_COLUMN_NAME, VALUE_COLUMN_NAME, dataType, baseLocation)); + "create or replace iceberg table %s (%s string, %s %s) %s", + tableName, + SOURCE_COLUMN_NAME, + VALUE_COLUMN_NAME, + dataType, + baseLocation, + getIcebergTableConfig(tableName))); return tableName; } + protected String getIcebergTableConfig(String tableName) { + String baseLocation = String.format("SDK_IT/%s/%s", databaseName, tableName); + return String.format( + "catalog = 'SNOWFLAKE' " + + "external_volume = 'streaming_ingest' " + + "base_location = '%s';", + baseLocation); + } + protected String getRandomIdentifier() { return String.format("test_%s", UUID.randomUUID()).replace('-', '_'); } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/it/ColumnNamesIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/it/ColumnNamesIT.java index fb253e5ea..d875ab71c 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/it/ColumnNamesIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/it/ColumnNamesIT.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + package net.snowflake.ingest.streaming.internal.it; import java.sql.ResultSet; @@ -13,17 +17,45 @@ import net.snowflake.ingest.streaming.OpenChannelRequest; import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel; import net.snowflake.ingest.streaming.internal.datatypes.AbstractDataTypeTest; +import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.SFException; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runners.Parameterized; public class ColumnNamesIT extends AbstractDataTypeTest { private static final int INGEST_VALUE = 1; + @Parameterized.Parameters( + name = "enableIcebergStreaming={0}, compressionAlgorithm={1}, icebergSerializationPolicy={2}") + public static Object[][] parameters() { + return new Object[][] { + // TODO: Enable this after Iceberg testing is set up on sfctest0 GCP / Azure + // {true, "ZSTD", Constants.IcebergSerializationPolicy.COMPATIBLE}, + // {true, "ZSTD", Constants.IcebergSerializationPolicy.OPTIMIZED}, + {false, "GZIP", null}, + {false, "ZSTD", null}, + }; + } + + @Parameterized.Parameter(0) + public static boolean enableIcebergStreaming; + + @Parameterized.Parameter(1) + public static String compressionAlgorithm; + + @Parameterized.Parameter(2) + public static Constants.IcebergSerializationPolicy icebergSerializationPolicy; + @Before public void before() throws Exception { - super.before(); + if (enableIcebergStreaming) { + super.beforeIceberg(compressionAlgorithm, icebergSerializationPolicy); + } else { + super.compressionAlgorithm = compressionAlgorithm; + super.before(); + } } @Test @@ -93,9 +125,11 @@ public void testNullableResolution() throws Exception { conn.createStatement() .execute( String.format( - "create or replace table %s (AbC int, \"AbC\" int, \"abC\" int, ab\\ c int, \"Ab" - + " c\" int);", - tableName)); + "create or replace %s table %s (AbC int, \"AbC\" int, \"abC\" int, ab\\ c int, \"Ab" + + " c\" int) %s", + enableIcebergStreaming ? "iceberg" : "", + tableName, + enableIcebergStreaming ? getIcebergTableConfig(tableName) : "")); SnowflakeStreamingIngestChannel channel = openChannel(tableName); String offsetToken = "token1"; channel.insertRow(new HashMap<>(), offsetToken); @@ -119,7 +153,12 @@ public void testNullableResolution() throws Exception { public void testExtraColNames() throws Exception { String tableName = "t1"; conn.createStatement() - .execute(String.format("create or replace table %s (\"create\" int);", tableName)); + .execute( + String.format( + "create or replace %s table %s (\"create\" int) %s;", + enableIcebergStreaming ? "iceberg" : "", + tableName, + enableIcebergStreaming ? getIcebergTableConfig(tableName) : "")); SnowflakeStreamingIngestChannel channel = openChannel(tableName, OpenChannelRequest.OnErrorOption.CONTINUE); @@ -152,9 +191,11 @@ public void testMissingNotNullColNames() throws Exception { conn.createStatement() .execute( String.format( - "create or replace table %s (\"CrEaTe\" int not null, a int not null, \"a\" int not" - + " null, \"create\" int);", - tableName)); + "create or replace %s table %s (\"CrEaTe\" int not null, a int not null, \"a\" int" + + " not null, \"create\" int) %s;", + enableIcebergStreaming ? "iceberg" : "", + tableName, + enableIcebergStreaming ? getIcebergTableConfig(tableName) : "")); SnowflakeStreamingIngestChannel channel = openChannel(tableName, OpenChannelRequest.OnErrorOption.CONTINUE); @@ -174,9 +215,11 @@ public void testNullValuesForNotNullColNames() throws Exception { conn.createStatement() .execute( String.format( - "create or replace table %s (col1 int not null, a int not null, \"a\" int not" - + " null, col2 int not null, \"col3\" int);", - tableName)); + "create or replace %s table %s (col1 int not null, a int not null, \"a\" int not" + + " null, col2 int not null, \"col3\" int) %s;", + enableIcebergStreaming ? "iceberg" : "", + tableName, + enableIcebergStreaming ? getIcebergTableConfig(tableName) : "")); SnowflakeStreamingIngestChannel channel = openChannel(tableName, OpenChannelRequest.OnErrorOption.CONTINUE); @@ -222,7 +265,9 @@ private void testColumnNameSupported(String createTableColumnName, String ingest } Assert.assertEquals(1, count); - conn.createStatement().execute(String.format("alter table %s migrate;", tableName)); + if (!enableIcebergStreaming) { + conn.createStatement().execute(String.format("alter table %s migrate;", tableName)); + } } private void testColumnNameSupported(String column) throws SQLException, InterruptedException { @@ -252,7 +297,12 @@ private void testInsertRowFails( private String createSimpleTable(String createTableColumnName) throws SQLException { String tableName = "a" + UUID.randomUUID().toString().replace("-", "_"); String createTableSql = - String.format("create table %s (%s int);", tableName, createTableColumnName); + String.format( + "create %s table %s (%s int) %s", + enableIcebergStreaming ? "iceberg" : "", + tableName, + createTableColumnName, + enableIcebergStreaming ? getIcebergTableConfig(tableName) : ""); conn.createStatement().execute(createTableSql); return tableName; }