From c939dc2d4a54c14382e43e0210c7cbb26ab18147 Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Wed, 13 Nov 2024 15:41:19 -0800 Subject: [PATCH] address comments --- .../datatypes/AbstractDataTypeTest.java | 33 +++-- .../internal/it/IcebergSchemaEvolutionIT.java | 120 +++++++++++++++--- 2 files changed, 123 insertions(+), 30 deletions(-) diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java index 9c9688efe..b2c97bae8 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java @@ -6,7 +6,6 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.json.JsonReadFeature; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.json.JsonMapper; import java.math.BigDecimal; @@ -17,12 +16,12 @@ import java.sql.SQLException; import java.sql.Timestamp; import java.time.ZoneId; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Properties; import java.util.Set; import java.util.UUID; import java.util.function.Predicate; @@ -32,6 +31,7 @@ import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient; import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.SFException; +import org.apache.commons.lang3.StringUtils; import org.assertj.core.api.Assertions; import org.junit.After; import org.junit.Assert; @@ -552,36 +552,41 @@ protected void testIcebergIngestAndQuery( protected void verifyMultipleColumns( String tableName, + SnowflakeStreamingIngestChannel channel, List> values, List> expectedValues, String orderBy) throws Exception { - SnowflakeStreamingIngestChannel channel = openChannel(tableName); + Set keySet = new HashSet<>(); + String offsetToken = null; for (Map value : values) { - String offsetToken = UUID.randomUUID().toString(); + offsetToken = UUID.randomUUID().toString(); channel.insertRow(value, offsetToken); - TestUtils.waitForOffset(channel, offsetToken); } + TestUtils.waitForOffset(channel, offsetToken); for (Map value : expectedValues) { keySet.addAll(value.keySet()); } - for (String key : keySet) { - String query = String.format("select %s from %s order by %s", key, tableName, orderBy); - ResultSet resultSet = conn.createStatement().executeQuery(query); + List keyList = new ArrayList<>(keySet); + String query = + String.format( + "select %s from %s order by %s", StringUtils.join(keyList, ", "), tableName, orderBy); + ResultSet resultSet = conn.createStatement().executeQuery(query); - for (Map expectedValue : expectedValues) { - Assertions.assertThat(resultSet.next()).isTrue(); - Object res = resultSet.getObject(1); - assertEqualValues(expectedValue.get(key), res); + for (Map expectedValue : expectedValues) { + Assertions.assertThat(resultSet.next()).isTrue(); + for (String key : keyList) { + assertEqualValues(expectedValue.get(key), resultSet.getObject(keyList.indexOf(key) + 1)); } - Assertions.assertThat(resultSet.next()).isFalse(); } + Assertions.assertThat(resultSet.next()).isFalse(); } - private void assertEqualValues(Object expectedValue, Object actualValue) throws JsonProcessingException { + private void assertEqualValues(Object expectedValue, Object actualValue) + throws JsonProcessingException { if (expectedValue instanceof BigDecimal) { Assertions.assertThat(actualValue) .usingComparatorForType(BigDecimal::compareTo, BigDecimal.class) diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/it/IcebergSchemaEvolutionIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/it/IcebergSchemaEvolutionIT.java index 2d307cc02..56a26a414 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/it/IcebergSchemaEvolutionIT.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/it/IcebergSchemaEvolutionIT.java @@ -8,29 +8,36 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import net.snowflake.ingest.IcebergIT; +import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel; import net.snowflake.ingest.streaming.internal.datatypes.AbstractDataTypeTest; import net.snowflake.ingest.utils.Constants; +import net.snowflake.ingest.utils.ErrorCode; +import net.snowflake.ingest.utils.SFException; +import org.assertj.core.api.Assertions; import org.junit.Before; import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +@Category(IcebergIT.class) +@RunWith(Parameterized.class) public class IcebergSchemaEvolutionIT extends AbstractDataTypeTest { - @Parameterized.Parameters(name = "compressionAlgorithm={0}, icebergSerializationPolicy={1}") - public static Object[][] parameters() { - return new Object[][] { - {"ZSTD", Constants.IcebergSerializationPolicy.COMPATIBLE}, - {"ZSTD", Constants.IcebergSerializationPolicy.OPTIMIZED} + @Parameterized.Parameters(name = "icebergSerializationPolicy={0}") + public static Object[] parameters() { + return new Object[] { + Constants.IcebergSerializationPolicy.COMPATIBLE, + Constants.IcebergSerializationPolicy.OPTIMIZED }; } - @Parameterized.Parameter public static String compressionAlgorithm; - - @Parameterized.Parameter(1) + @Parameterized.Parameter public static Constants.IcebergSerializationPolicy icebergSerializationPolicy; @Before public void before() throws Exception { - super.beforeIceberg(compressionAlgorithm, icebergSerializationPolicy); + super.setUp(true, "ZSTD", icebergSerializationPolicy); } @Test @@ -39,6 +46,7 @@ public void testPrimitiveColumns() throws Exception { createIcebergTableWithColumns( "id int, int_col int, string_col string, double_col double, boolean_col boolean, " + " binary_col binary"); + SnowflakeStreamingIngestChannel channel = openChannel(tableName); Map value = new HashMap<>(); value.put("id", 0L); value.put("int_col", 1L); @@ -47,7 +55,11 @@ public void testPrimitiveColumns() throws Exception { value.put("boolean_col", true); value.put("binary_col", "4".getBytes()); verifyMultipleColumns( - tableName, Collections.singletonList(value), Collections.singletonList(value), "id"); + tableName, + channel, + Collections.singletonList(value), + Collections.singletonList(value), + "id"); conn.createStatement() .execute( @@ -66,18 +78,45 @@ public void testPrimitiveColumns() throws Exception { newValue.put("new_string_col", "7"); newValue.put("new_boolean_col", true); newValue.put("new_binary_col", "8".getBytes()); + Assertions.assertThatThrownBy( + () -> + verifyMultipleColumns( + tableName, + channel, + Collections.singletonList(newValue), + Arrays.asList(value, newValue), + "id")) + .isInstanceOf(SFException.class) + .hasMessage( + "The given row cannot be converted to the internal format: " + + "Extra columns: [new_binary_col, new_boolean_col, new_int_col, new_string_col]. " + + "Columns not present in the table shouldn't be specified, rowIndex:0") + .extracting("vendorCode") + .isEqualTo(ErrorCode.INVALID_FORMAT_ROW.getMessageCode()); + channel.close(); + + SnowflakeStreamingIngestChannel newChannel = openChannel(tableName); verifyMultipleColumns( - tableName, Collections.singletonList(newValue), Arrays.asList(value, newValue), "id"); + tableName, + newChannel, + Collections.singletonList(newValue), + Arrays.asList(value, newValue), + "id"); } @Test public void testStructType() throws Exception { String tableName = createIcebergTableWithColumns("id int, object_col object(a int)"); + SnowflakeStreamingIngestChannel channel = openChannel(tableName); Map value = new HashMap<>(); value.put("id", 0L); value.put("object_col", Collections.singletonMap("a", 1)); verifyMultipleColumns( - tableName, Collections.singletonList(value), Collections.singletonList(value), "id"); + tableName, + channel, + Collections.singletonList(value), + Collections.singletonList(value), + "id"); conn.createStatement() .execute( @@ -92,7 +131,6 @@ public void testStructType() throws Exception { put("b", null); } }); - Map newValue = new HashMap<>(); newValue.put("id", 1L); newValue.put( @@ -103,8 +141,30 @@ public void testStructType() throws Exception { put("b", 3); } }); + Assertions.assertThatThrownBy( + () -> + verifyMultipleColumns( + tableName, + channel, + Collections.singletonList(newValue), + Arrays.asList(value, newValue), + "id")) + .isInstanceOf(SFException.class) + .hasMessage( + "The given row cannot be converted to the internal format: Invalid row 0." + + " missingNotNullColNames=null, extraColNames=[OBJECT_COL.b]," + + " nullValueForNotNullColNames=null") + .extracting("vendorCode") + .isEqualTo(ErrorCode.INVALID_FORMAT_ROW.getMessageCode()); + channel.close(); + + SnowflakeStreamingIngestChannel newChannel = openChannel(tableName); verifyMultipleColumns( - tableName, Collections.singletonList(newValue), Arrays.asList(value, newValue), "id"); + tableName, + newChannel, + Collections.singletonList(newValue), + Arrays.asList(value, newValue), + "id"); } @Test @@ -112,6 +172,7 @@ public void testNestedDataType() throws Exception { String tableName = createIcebergTableWithColumns( "id int, object_col object(map_col map(string, array(object(a int))))"); + SnowflakeStreamingIngestChannel channel = openChannel(tableName); Map value = new HashMap<>(); value.put("id", 0L); value.put( @@ -121,7 +182,11 @@ public void testNestedDataType() throws Exception { Collections.singletonMap( "key", Collections.singletonList(Collections.singletonMap("a", 1))))); verifyMultipleColumns( - tableName, Collections.singletonList(value), Collections.singletonList(value), "id"); + tableName, + channel, + Collections.singletonList(value), + Collections.singletonList(value), + "id"); conn.createStatement() .execute( @@ -168,7 +233,30 @@ public void testNestedDataType() throws Exception { put("map_col_2", Collections.singletonMap("key", 4)); } }); + Assertions.assertThatThrownBy( + () -> + verifyMultipleColumns( + tableName, + channel, + Collections.singletonList(newValue), + Arrays.asList(value, newValue), + "id")) + .isInstanceOf(SFException.class) + .hasMessage( + "The given row cannot be converted to the internal format: Invalid row 0." + + " missingNotNullColNames=null," + + " extraColNames=[OBJECT_COL.map_col.key_value.value.list.element.b," + + " OBJECT_COL.map_col_2], nullValueForNotNullColNames=null") + .extracting("vendorCode") + .isEqualTo(ErrorCode.INVALID_FORMAT_ROW.getMessageCode()); + channel.close(); + + SnowflakeStreamingIngestChannel newChannel = openChannel(tableName); verifyMultipleColumns( - tableName, Collections.singletonList(newValue), Arrays.asList(value, newValue), "id"); + tableName, + newChannel, + Collections.singletonList(newValue), + Arrays.asList(value, newValue), + "id"); } }