Skip to content

Commit

Permalink
Schema evolution test for Iceberg ingestion (#884)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-alhuang authored Nov 14, 2024
1 parent 84c8c66 commit 45c17a5
Show file tree
Hide file tree
Showing 2 changed files with 318 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package net.snowflake.ingest.streaming.internal.datatypes;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.json.JsonReadFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;
Expand All @@ -15,9 +16,13 @@
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.Predicate;
import net.snowflake.ingest.TestUtils;
Expand All @@ -26,6 +31,7 @@
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.SFException;
import org.apache.commons.lang3.StringUtils;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
Expand Down Expand Up @@ -128,6 +134,19 @@ protected String createIcebergTable(String dataType) throws SQLException {
return tableName;
}

protected String createIcebergTableWithColumns(String columns) throws SQLException {
String tableName = getRandomIdentifier();
String baseLocation =
String.format("SDK_IT/%s/%s/%s", databaseName, columns.replace(" ", "_"), tableName);
conn.createStatement()
.execute(
String.format(
"create or replace iceberg table %s (%s) %s",
tableName, columns, getIcebergTableConfig(tableName)));

return tableName;
}

protected String getIcebergTableConfig(String tableName) {
String baseLocation = String.format("SDK_IT/%s/%s", databaseName, tableName);
return String.format(
Expand Down Expand Up @@ -526,20 +545,60 @@ protected void testIcebergIngestAndQuery(
for (Object expectedValue : expectedValues) {
Assertions.assertThat(resultSet.next()).isTrue();
Object res = resultSet.getObject(1);
if (expectedValue instanceof BigDecimal) {
Assertions.assertThat(res)
.usingComparatorForType(BigDecimal::compareTo, BigDecimal.class)
.usingRecursiveComparison()
.isEqualTo(expectedValue);
} else if (expectedValue instanceof Map) {
Assertions.assertThat(objectMapper.readTree((String) res))
.isEqualTo(objectMapper.valueToTree(expectedValue));
} else if (expectedValue instanceof Timestamp) {
Assertions.assertThat(res.toString()).isEqualTo(expectedValue.toString());
} else {
Assertions.assertThat(res).isEqualTo(expectedValue);
assertEqualValues(expectedValue, res);
}
Assertions.assertThat(resultSet.next()).isFalse();
}

protected void verifyMultipleColumns(
String tableName,
SnowflakeStreamingIngestChannel channel,
List<Map<String, Object>> values,
List<Map<String, Object>> expectedValues,
String orderBy)
throws Exception {

Set<String> keySet = new HashSet<>();
String offsetToken = null;
for (Map<String, Object> value : values) {
offsetToken = UUID.randomUUID().toString();
channel.insertRow(value, offsetToken);
}
TestUtils.waitForOffset(channel, offsetToken);

for (Map<String, Object> value : expectedValues) {
keySet.addAll(value.keySet());
}

List<String> keyList = new ArrayList<>(keySet);
String query =
String.format(
"select %s from %s order by %s", StringUtils.join(keyList, ", "), tableName, orderBy);
ResultSet resultSet = conn.createStatement().executeQuery(query);

for (Map<String, Object> expectedValue : expectedValues) {
Assertions.assertThat(resultSet.next()).isTrue();
for (String key : keyList) {
assertEqualValues(expectedValue.get(key), resultSet.getObject(keyList.indexOf(key) + 1));
}
}
Assertions.assertThat(resultSet.next()).isFalse();
}

private void assertEqualValues(Object expectedValue, Object actualValue)
throws JsonProcessingException {
if (expectedValue instanceof BigDecimal) {
Assertions.assertThat(actualValue)
.usingComparatorForType(BigDecimal::compareTo, BigDecimal.class)
.usingRecursiveComparison()
.isEqualTo(expectedValue);
} else if (expectedValue instanceof Map) {
Assertions.assertThat(objectMapper.readTree((String) actualValue))
.isEqualTo(objectMapper.valueToTree(expectedValue));
} else if (expectedValue instanceof Timestamp) {
Assertions.assertThat(actualValue.toString()).isEqualTo(expectedValue.toString());
} else {
Assertions.assertThat(actualValue).isEqualTo(expectedValue);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
/*
* Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal.it;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import net.snowflake.ingest.IcebergIT;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel;
import net.snowflake.ingest.streaming.internal.datatypes.AbstractDataTypeTest;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@Category(IcebergIT.class)
@RunWith(Parameterized.class)
public class IcebergSchemaEvolutionIT extends AbstractDataTypeTest {
@Parameterized.Parameters(name = "icebergSerializationPolicy={0}")
public static Object[] parameters() {
return new Object[] {
Constants.IcebergSerializationPolicy.COMPATIBLE,
Constants.IcebergSerializationPolicy.OPTIMIZED
};
}

@Parameterized.Parameter
public static Constants.IcebergSerializationPolicy icebergSerializationPolicy;

@Before
public void before() throws Exception {
super.setUp(true, "ZSTD", icebergSerializationPolicy);
}

@Test
public void testPrimitiveColumns() throws Exception {
String tableName =
createIcebergTableWithColumns(
"id int, int_col int, string_col string, double_col double, boolean_col boolean, "
+ " binary_col binary");
SnowflakeStreamingIngestChannel channel = openChannel(tableName);
Map<String, Object> value = new HashMap<>();
value.put("id", 0L);
value.put("int_col", 1L);
value.put("string_col", "2");
value.put("double_col", 3.0);
value.put("boolean_col", true);
value.put("binary_col", "4".getBytes());
verifyMultipleColumns(
tableName,
channel,
Collections.singletonList(value),
Collections.singletonList(value),
"id");

conn.createStatement()
.execute(
String.format(
"ALTER ICEBERG TABLE %s ADD COLUMN new_int_col int, new_string_col string,"
+ " new_boolean_col boolean, new_binary_col binary",
tableName));
Map<String, Object> newValue = new HashMap<>();
newValue.put("id", 1L);
newValue.put("int_col", 2L);
newValue.put("string_col", "3");
newValue.put("double_col", 4.0);
newValue.put("boolean_col", false);
newValue.put("binary_col", "5".getBytes());
newValue.put("new_int_col", 6L);
newValue.put("new_string_col", "7");
newValue.put("new_boolean_col", true);
newValue.put("new_binary_col", "8".getBytes());
Assertions.assertThatThrownBy(
() ->
verifyMultipleColumns(
tableName,
channel,
Collections.singletonList(newValue),
Arrays.asList(value, newValue),
"id"))
.isInstanceOf(SFException.class)
.hasMessage(
"The given row cannot be converted to the internal format: "
+ "Extra columns: [new_binary_col, new_boolean_col, new_int_col, new_string_col]. "
+ "Columns not present in the table shouldn't be specified, rowIndex:0")
.extracting("vendorCode")
.isEqualTo(ErrorCode.INVALID_FORMAT_ROW.getMessageCode());
channel.close();

SnowflakeStreamingIngestChannel newChannel = openChannel(tableName);
verifyMultipleColumns(
tableName,
newChannel,
Collections.singletonList(newValue),
Arrays.asList(value, newValue),
"id");
}

@Test
public void testStructType() throws Exception {
String tableName = createIcebergTableWithColumns("id int, object_col object(a int)");
SnowflakeStreamingIngestChannel channel = openChannel(tableName);
Map<String, Object> value = new HashMap<>();
value.put("id", 0L);
value.put("object_col", ImmutableMap.of("a", 1));
verifyMultipleColumns(
tableName,
channel,
Collections.singletonList(value),
Collections.singletonList(value),
"id");

conn.createStatement()
.execute(
String.format(
"ALTER ICEBERG TABLE %s ALTER COLUMN object_col SET DATA TYPE object(a int, b int)",
tableName));
value.put(
"object_col",
Collections.unmodifiableMap(
new HashMap<String, Object>() {
{
put("a", 1);
put("b", null);
}
}));
Map<String, Object> newValue = new HashMap<>();
newValue.put("id", 1L);
newValue.put("object_col", ImmutableMap.of("a", 2, "b", 3));
Assertions.assertThatThrownBy(
() ->
verifyMultipleColumns(
tableName,
channel,
Collections.singletonList(newValue),
Arrays.asList(value, newValue),
"id"))
.isInstanceOf(SFException.class)
.hasMessage(
"The given row cannot be converted to the internal format: Invalid row 0."
+ " missingNotNullColNames=null, extraColNames=[OBJECT_COL.b],"
+ " nullValueForNotNullColNames=null")
.extracting("vendorCode")
.isEqualTo(ErrorCode.INVALID_FORMAT_ROW.getMessageCode());
channel.close();

SnowflakeStreamingIngestChannel newChannel = openChannel(tableName);
verifyMultipleColumns(
tableName,
newChannel,
Collections.singletonList(newValue),
Arrays.asList(value, newValue),
"id");
}

@Test
public void testNestedDataType() throws Exception {
String tableName =
createIcebergTableWithColumns(
"id int, object_col object(map_col map(string, array(object(a int))))");
SnowflakeStreamingIngestChannel channel = openChannel(tableName);
Map<String, Object> value = new HashMap<>();
value.put("id", 0L);
value.put(
"object_col",
ImmutableMap.of(
"map_col", ImmutableMap.of("key", ImmutableList.of((ImmutableMap.of("a", 1))))));
verifyMultipleColumns(
tableName,
channel,
Collections.singletonList(value),
Collections.singletonList(value),
"id");

conn.createStatement()
.execute(
String.format(
"ALTER ICEBERG TABLE %s ALTER COLUMN object_col SET DATA TYPE object(map_col"
+ " map(string, array(object(a int, b int))), map_col_2 map(string, int))",
tableName));
value.put(
"object_col",
Collections.unmodifiableMap(
new HashMap<String, Object>() {
{
put(
"map_col",
ImmutableMap.of(
"key",
ImmutableList.of(
Collections.unmodifiableMap(
new HashMap<String, Object>() {
{
put("a", 1);
put("b", null);
}
}))));
put("map_col_2", null);
}
}));

Map<String, Object> newValue = new HashMap<>();
newValue.put("id", 1L);
newValue.put(
"object_col",
ImmutableMap.of(
"map_col",
ImmutableMap.of("key", ImmutableList.of(ImmutableMap.of("a", 2, "b", 3))),
"map_col_2",
ImmutableMap.of("key", 4)));
Assertions.assertThatThrownBy(
() ->
verifyMultipleColumns(
tableName,
channel,
Collections.singletonList(newValue),
Arrays.asList(value, newValue),
"id"))
.isInstanceOf(SFException.class)
.hasMessage(
"The given row cannot be converted to the internal format: Invalid row 0."
+ " missingNotNullColNames=null,"
+ " extraColNames=[OBJECT_COL.map_col.key_value.value.list.element.b,"
+ " OBJECT_COL.map_col_2], nullValueForNotNullColNames=null")
.extracting("vendorCode")
.isEqualTo(ErrorCode.INVALID_FORMAT_ROW.getMessageCode());
channel.close();

SnowflakeStreamingIngestChannel newChannel = openChannel(tableName);
verifyMultipleColumns(
tableName,
newChannel,
Collections.singletonList(newValue),
Arrays.asList(value, newValue),
"id");
}
}

0 comments on commit 45c17a5

Please sign in to comment.