From 45c17a586638009c8203ba1538837f90d52deaaa Mon Sep 17 00:00:00 2001 From: Alec Huang Date: Thu, 14 Nov 2024 10:16:06 -0800 Subject: [PATCH] Schema evolution test for Iceberg ingestion (#884) --- .../datatypes/AbstractDataTypeTest.java | 83 +++++- .../internal/it/IcebergSchemaEvolutionIT.java | 247 ++++++++++++++++++ 2 files changed, 318 insertions(+), 12 deletions(-) create mode 100644 src/test/java/net/snowflake/ingest/streaming/internal/it/IcebergSchemaEvolutionIT.java diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java index 9a1d3863e..b2c97bae8 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/datatypes/AbstractDataTypeTest.java @@ -4,6 +4,7 @@ package net.snowflake.ingest.streaming.internal.datatypes; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.json.JsonReadFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.json.JsonMapper; @@ -15,9 +16,13 @@ import java.sql.SQLException; import java.sql.Timestamp; import java.time.ZoneId; +import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.UUID; import java.util.function.Predicate; import net.snowflake.ingest.TestUtils; @@ -26,6 +31,7 @@ import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient; import net.snowflake.ingest.utils.Constants; import net.snowflake.ingest.utils.SFException; +import org.apache.commons.lang3.StringUtils; import org.assertj.core.api.Assertions; import org.junit.After; import org.junit.Assert; @@ -128,6 +134,19 @@ protected String createIcebergTable(String dataType) throws SQLException { return tableName; } + protected String createIcebergTableWithColumns(String columns) throws SQLException { + String tableName = getRandomIdentifier(); + String baseLocation = + String.format("SDK_IT/%s/%s/%s", databaseName, columns.replace(" ", "_"), tableName); + conn.createStatement() + .execute( + String.format( + "create or replace iceberg table %s (%s) %s", + tableName, columns, getIcebergTableConfig(tableName))); + + return tableName; + } + protected String getIcebergTableConfig(String tableName) { String baseLocation = String.format("SDK_IT/%s/%s", databaseName, tableName); return String.format( @@ -526,20 +545,60 @@ protected void testIcebergIngestAndQuery( for (Object expectedValue : expectedValues) { Assertions.assertThat(resultSet.next()).isTrue(); Object res = resultSet.getObject(1); - if (expectedValue instanceof BigDecimal) { - Assertions.assertThat(res) - .usingComparatorForType(BigDecimal::compareTo, BigDecimal.class) - .usingRecursiveComparison() - .isEqualTo(expectedValue); - } else if (expectedValue instanceof Map) { - Assertions.assertThat(objectMapper.readTree((String) res)) - .isEqualTo(objectMapper.valueToTree(expectedValue)); - } else if (expectedValue instanceof Timestamp) { - Assertions.assertThat(res.toString()).isEqualTo(expectedValue.toString()); - } else { - Assertions.assertThat(res).isEqualTo(expectedValue); + assertEqualValues(expectedValue, res); + } + Assertions.assertThat(resultSet.next()).isFalse(); + } + + protected void verifyMultipleColumns( + String tableName, + SnowflakeStreamingIngestChannel channel, + List> values, + List> expectedValues, + String orderBy) + throws Exception { + + Set keySet = new HashSet<>(); + String offsetToken = null; + for (Map value : values) { + offsetToken = UUID.randomUUID().toString(); + channel.insertRow(value, offsetToken); + } + TestUtils.waitForOffset(channel, offsetToken); + + for (Map value : expectedValues) { + keySet.addAll(value.keySet()); + } + + List keyList = new ArrayList<>(keySet); + String query = + String.format( + "select %s from %s order by %s", StringUtils.join(keyList, ", "), tableName, orderBy); + ResultSet resultSet = conn.createStatement().executeQuery(query); + + for (Map expectedValue : expectedValues) { + Assertions.assertThat(resultSet.next()).isTrue(); + for (String key : keyList) { + assertEqualValues(expectedValue.get(key), resultSet.getObject(keyList.indexOf(key) + 1)); } } Assertions.assertThat(resultSet.next()).isFalse(); } + + private void assertEqualValues(Object expectedValue, Object actualValue) + throws JsonProcessingException { + if (expectedValue instanceof BigDecimal) { + Assertions.assertThat(actualValue) + .usingComparatorForType(BigDecimal::compareTo, BigDecimal.class) + .usingRecursiveComparison() + .isEqualTo(expectedValue); + } else if (expectedValue instanceof Map) { + Assertions.assertThat(objectMapper.readTree((String) actualValue)) + .isEqualTo(objectMapper.valueToTree(expectedValue)); + } else if (expectedValue instanceof Timestamp) { + Assertions.assertThat(actualValue.toString()).isEqualTo(expectedValue.toString()); + } else { + Assertions.assertThat(actualValue).isEqualTo(expectedValue); + } + } } diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/it/IcebergSchemaEvolutionIT.java b/src/test/java/net/snowflake/ingest/streaming/internal/it/IcebergSchemaEvolutionIT.java new file mode 100644 index 000000000..3021b56ea --- /dev/null +++ b/src/test/java/net/snowflake/ingest/streaming/internal/it/IcebergSchemaEvolutionIT.java @@ -0,0 +1,247 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package net.snowflake.ingest.streaming.internal.it; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import net.snowflake.ingest.IcebergIT; +import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel; +import net.snowflake.ingest.streaming.internal.datatypes.AbstractDataTypeTest; +import net.snowflake.ingest.utils.Constants; +import net.snowflake.ingest.utils.ErrorCode; +import net.snowflake.ingest.utils.SFException; +import org.assertj.core.api.Assertions; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@Category(IcebergIT.class) +@RunWith(Parameterized.class) +public class IcebergSchemaEvolutionIT extends AbstractDataTypeTest { + @Parameterized.Parameters(name = "icebergSerializationPolicy={0}") + public static Object[] parameters() { + return new Object[] { + Constants.IcebergSerializationPolicy.COMPATIBLE, + Constants.IcebergSerializationPolicy.OPTIMIZED + }; + } + + @Parameterized.Parameter + public static Constants.IcebergSerializationPolicy icebergSerializationPolicy; + + @Before + public void before() throws Exception { + super.setUp(true, "ZSTD", icebergSerializationPolicy); + } + + @Test + public void testPrimitiveColumns() throws Exception { + String tableName = + createIcebergTableWithColumns( + "id int, int_col int, string_col string, double_col double, boolean_col boolean, " + + " binary_col binary"); + SnowflakeStreamingIngestChannel channel = openChannel(tableName); + Map value = new HashMap<>(); + value.put("id", 0L); + value.put("int_col", 1L); + value.put("string_col", "2"); + value.put("double_col", 3.0); + value.put("boolean_col", true); + value.put("binary_col", "4".getBytes()); + verifyMultipleColumns( + tableName, + channel, + Collections.singletonList(value), + Collections.singletonList(value), + "id"); + + conn.createStatement() + .execute( + String.format( + "ALTER ICEBERG TABLE %s ADD COLUMN new_int_col int, new_string_col string," + + " new_boolean_col boolean, new_binary_col binary", + tableName)); + Map newValue = new HashMap<>(); + newValue.put("id", 1L); + newValue.put("int_col", 2L); + newValue.put("string_col", "3"); + newValue.put("double_col", 4.0); + newValue.put("boolean_col", false); + newValue.put("binary_col", "5".getBytes()); + newValue.put("new_int_col", 6L); + newValue.put("new_string_col", "7"); + newValue.put("new_boolean_col", true); + newValue.put("new_binary_col", "8".getBytes()); + Assertions.assertThatThrownBy( + () -> + verifyMultipleColumns( + tableName, + channel, + Collections.singletonList(newValue), + Arrays.asList(value, newValue), + "id")) + .isInstanceOf(SFException.class) + .hasMessage( + "The given row cannot be converted to the internal format: " + + "Extra columns: [new_binary_col, new_boolean_col, new_int_col, new_string_col]. " + + "Columns not present in the table shouldn't be specified, rowIndex:0") + .extracting("vendorCode") + .isEqualTo(ErrorCode.INVALID_FORMAT_ROW.getMessageCode()); + channel.close(); + + SnowflakeStreamingIngestChannel newChannel = openChannel(tableName); + verifyMultipleColumns( + tableName, + newChannel, + Collections.singletonList(newValue), + Arrays.asList(value, newValue), + "id"); + } + + @Test + public void testStructType() throws Exception { + String tableName = createIcebergTableWithColumns("id int, object_col object(a int)"); + SnowflakeStreamingIngestChannel channel = openChannel(tableName); + Map value = new HashMap<>(); + value.put("id", 0L); + value.put("object_col", ImmutableMap.of("a", 1)); + verifyMultipleColumns( + tableName, + channel, + Collections.singletonList(value), + Collections.singletonList(value), + "id"); + + conn.createStatement() + .execute( + String.format( + "ALTER ICEBERG TABLE %s ALTER COLUMN object_col SET DATA TYPE object(a int, b int)", + tableName)); + value.put( + "object_col", + Collections.unmodifiableMap( + new HashMap() { + { + put("a", 1); + put("b", null); + } + })); + Map newValue = new HashMap<>(); + newValue.put("id", 1L); + newValue.put("object_col", ImmutableMap.of("a", 2, "b", 3)); + Assertions.assertThatThrownBy( + () -> + verifyMultipleColumns( + tableName, + channel, + Collections.singletonList(newValue), + Arrays.asList(value, newValue), + "id")) + .isInstanceOf(SFException.class) + .hasMessage( + "The given row cannot be converted to the internal format: Invalid row 0." + + " missingNotNullColNames=null, extraColNames=[OBJECT_COL.b]," + + " nullValueForNotNullColNames=null") + .extracting("vendorCode") + .isEqualTo(ErrorCode.INVALID_FORMAT_ROW.getMessageCode()); + channel.close(); + + SnowflakeStreamingIngestChannel newChannel = openChannel(tableName); + verifyMultipleColumns( + tableName, + newChannel, + Collections.singletonList(newValue), + Arrays.asList(value, newValue), + "id"); + } + + @Test + public void testNestedDataType() throws Exception { + String tableName = + createIcebergTableWithColumns( + "id int, object_col object(map_col map(string, array(object(a int))))"); + SnowflakeStreamingIngestChannel channel = openChannel(tableName); + Map value = new HashMap<>(); + value.put("id", 0L); + value.put( + "object_col", + ImmutableMap.of( + "map_col", ImmutableMap.of("key", ImmutableList.of((ImmutableMap.of("a", 1)))))); + verifyMultipleColumns( + tableName, + channel, + Collections.singletonList(value), + Collections.singletonList(value), + "id"); + + conn.createStatement() + .execute( + String.format( + "ALTER ICEBERG TABLE %s ALTER COLUMN object_col SET DATA TYPE object(map_col" + + " map(string, array(object(a int, b int))), map_col_2 map(string, int))", + tableName)); + value.put( + "object_col", + Collections.unmodifiableMap( + new HashMap() { + { + put( + "map_col", + ImmutableMap.of( + "key", + ImmutableList.of( + Collections.unmodifiableMap( + new HashMap() { + { + put("a", 1); + put("b", null); + } + })))); + put("map_col_2", null); + } + })); + + Map newValue = new HashMap<>(); + newValue.put("id", 1L); + newValue.put( + "object_col", + ImmutableMap.of( + "map_col", + ImmutableMap.of("key", ImmutableList.of(ImmutableMap.of("a", 2, "b", 3))), + "map_col_2", + ImmutableMap.of("key", 4))); + Assertions.assertThatThrownBy( + () -> + verifyMultipleColumns( + tableName, + channel, + Collections.singletonList(newValue), + Arrays.asList(value, newValue), + "id")) + .isInstanceOf(SFException.class) + .hasMessage( + "The given row cannot be converted to the internal format: Invalid row 0." + + " missingNotNullColNames=null," + + " extraColNames=[OBJECT_COL.map_col.key_value.value.list.element.b," + + " OBJECT_COL.map_col_2], nullValueForNotNullColNames=null") + .extracting("vendorCode") + .isEqualTo(ErrorCode.INVALID_FORMAT_ROW.getMessageCode()); + channel.close(); + + SnowflakeStreamingIngestChannel newChannel = openChannel(tableName); + verifyMultipleColumns( + tableName, + newChannel, + Collections.singletonList(newValue), + Arrays.asList(value, newValue), + "id"); + } +}