Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-alhuang committed Nov 13, 2024
1 parent 9956031 commit c939dc2
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.json.JsonReadFeature;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;
import java.math.BigDecimal;
Expand All @@ -17,12 +16,12 @@
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.function.Predicate;
Expand All @@ -32,6 +31,7 @@
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.SFException;
import org.apache.commons.lang3.StringUtils;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
Expand Down Expand Up @@ -552,36 +552,41 @@ protected void testIcebergIngestAndQuery(

protected void verifyMultipleColumns(
String tableName,
SnowflakeStreamingIngestChannel channel,
List<Map<String, Object>> values,
List<Map<String, Object>> expectedValues,
String orderBy)
throws Exception {
SnowflakeStreamingIngestChannel channel = openChannel(tableName);

Set<String> keySet = new HashSet<>();
String offsetToken = null;
for (Map<String, Object> value : values) {
String offsetToken = UUID.randomUUID().toString();
offsetToken = UUID.randomUUID().toString();
channel.insertRow(value, offsetToken);
TestUtils.waitForOffset(channel, offsetToken);
}
TestUtils.waitForOffset(channel, offsetToken);

for (Map<String, Object> value : expectedValues) {
keySet.addAll(value.keySet());
}

for (String key : keySet) {
String query = String.format("select %s from %s order by %s", key, tableName, orderBy);
ResultSet resultSet = conn.createStatement().executeQuery(query);
List<String> keyList = new ArrayList<>(keySet);
String query =
String.format(
"select %s from %s order by %s", StringUtils.join(keyList, ", "), tableName, orderBy);
ResultSet resultSet = conn.createStatement().executeQuery(query);

for (Map<String, Object> expectedValue : expectedValues) {
Assertions.assertThat(resultSet.next()).isTrue();
Object res = resultSet.getObject(1);
assertEqualValues(expectedValue.get(key), res);
for (Map<String, Object> expectedValue : expectedValues) {
Assertions.assertThat(resultSet.next()).isTrue();
for (String key : keyList) {
assertEqualValues(expectedValue.get(key), resultSet.getObject(keyList.indexOf(key) + 1));
}
Assertions.assertThat(resultSet.next()).isFalse();
}
Assertions.assertThat(resultSet.next()).isFalse();
}

private void assertEqualValues(Object expectedValue, Object actualValue) throws JsonProcessingException {
private void assertEqualValues(Object expectedValue, Object actualValue)
throws JsonProcessingException {
if (expectedValue instanceof BigDecimal) {
Assertions.assertThat(actualValue)
.usingComparatorForType(BigDecimal::compareTo, BigDecimal.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,36 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import net.snowflake.ingest.IcebergIT;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel;
import net.snowflake.ingest.streaming.internal.datatypes.AbstractDataTypeTest;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.SFException;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@Category(IcebergIT.class)
@RunWith(Parameterized.class)
public class IcebergSchemaEvolutionIT extends AbstractDataTypeTest {
@Parameterized.Parameters(name = "compressionAlgorithm={0}, icebergSerializationPolicy={1}")
public static Object[][] parameters() {
return new Object[][] {
{"ZSTD", Constants.IcebergSerializationPolicy.COMPATIBLE},
{"ZSTD", Constants.IcebergSerializationPolicy.OPTIMIZED}
@Parameterized.Parameters(name = "icebergSerializationPolicy={0}")
public static Object[] parameters() {
return new Object[] {
Constants.IcebergSerializationPolicy.COMPATIBLE,
Constants.IcebergSerializationPolicy.OPTIMIZED
};
}

@Parameterized.Parameter public static String compressionAlgorithm;

@Parameterized.Parameter(1)
@Parameterized.Parameter
public static Constants.IcebergSerializationPolicy icebergSerializationPolicy;

@Before
public void before() throws Exception {
super.beforeIceberg(compressionAlgorithm, icebergSerializationPolicy);
super.setUp(true, "ZSTD", icebergSerializationPolicy);
}

@Test
Expand All @@ -39,6 +46,7 @@ public void testPrimitiveColumns() throws Exception {
createIcebergTableWithColumns(
"id int, int_col int, string_col string, double_col double, boolean_col boolean, "
+ " binary_col binary");
SnowflakeStreamingIngestChannel channel = openChannel(tableName);
Map<String, Object> value = new HashMap<>();
value.put("id", 0L);
value.put("int_col", 1L);
Expand All @@ -47,7 +55,11 @@ public void testPrimitiveColumns() throws Exception {
value.put("boolean_col", true);
value.put("binary_col", "4".getBytes());
verifyMultipleColumns(
tableName, Collections.singletonList(value), Collections.singletonList(value), "id");
tableName,
channel,
Collections.singletonList(value),
Collections.singletonList(value),
"id");

conn.createStatement()
.execute(
Expand All @@ -66,18 +78,45 @@ public void testPrimitiveColumns() throws Exception {
newValue.put("new_string_col", "7");
newValue.put("new_boolean_col", true);
newValue.put("new_binary_col", "8".getBytes());
Assertions.assertThatThrownBy(
() ->
verifyMultipleColumns(
tableName,
channel,
Collections.singletonList(newValue),
Arrays.asList(value, newValue),
"id"))
.isInstanceOf(SFException.class)
.hasMessage(
"The given row cannot be converted to the internal format: "
+ "Extra columns: [new_binary_col, new_boolean_col, new_int_col, new_string_col]. "
+ "Columns not present in the table shouldn't be specified, rowIndex:0")
.extracting("vendorCode")
.isEqualTo(ErrorCode.INVALID_FORMAT_ROW.getMessageCode());
channel.close();

SnowflakeStreamingIngestChannel newChannel = openChannel(tableName);
verifyMultipleColumns(
tableName, Collections.singletonList(newValue), Arrays.asList(value, newValue), "id");
tableName,
newChannel,
Collections.singletonList(newValue),
Arrays.asList(value, newValue),
"id");
}

@Test
public void testStructType() throws Exception {
String tableName = createIcebergTableWithColumns("id int, object_col object(a int)");
SnowflakeStreamingIngestChannel channel = openChannel(tableName);
Map<String, Object> value = new HashMap<>();
value.put("id", 0L);
value.put("object_col", Collections.singletonMap("a", 1));
verifyMultipleColumns(
tableName, Collections.singletonList(value), Collections.singletonList(value), "id");
tableName,
channel,
Collections.singletonList(value),
Collections.singletonList(value),
"id");

conn.createStatement()
.execute(
Expand All @@ -92,7 +131,6 @@ public void testStructType() throws Exception {
put("b", null);
}
});

Map<String, Object> newValue = new HashMap<>();
newValue.put("id", 1L);
newValue.put(
Expand All @@ -103,15 +141,38 @@ public void testStructType() throws Exception {
put("b", 3);
}
});
Assertions.assertThatThrownBy(
() ->
verifyMultipleColumns(
tableName,
channel,
Collections.singletonList(newValue),
Arrays.asList(value, newValue),
"id"))
.isInstanceOf(SFException.class)
.hasMessage(
"The given row cannot be converted to the internal format: Invalid row 0."
+ " missingNotNullColNames=null, extraColNames=[OBJECT_COL.b],"
+ " nullValueForNotNullColNames=null")
.extracting("vendorCode")
.isEqualTo(ErrorCode.INVALID_FORMAT_ROW.getMessageCode());
channel.close();

SnowflakeStreamingIngestChannel newChannel = openChannel(tableName);
verifyMultipleColumns(
tableName, Collections.singletonList(newValue), Arrays.asList(value, newValue), "id");
tableName,
newChannel,
Collections.singletonList(newValue),
Arrays.asList(value, newValue),
"id");
}

@Test
public void testNestedDataType() throws Exception {
String tableName =
createIcebergTableWithColumns(
"id int, object_col object(map_col map(string, array(object(a int))))");
SnowflakeStreamingIngestChannel channel = openChannel(tableName);
Map<String, Object> value = new HashMap<>();
value.put("id", 0L);
value.put(
Expand All @@ -121,7 +182,11 @@ public void testNestedDataType() throws Exception {
Collections.singletonMap(
"key", Collections.singletonList(Collections.singletonMap("a", 1)))));
verifyMultipleColumns(
tableName, Collections.singletonList(value), Collections.singletonList(value), "id");
tableName,
channel,
Collections.singletonList(value),
Collections.singletonList(value),
"id");

conn.createStatement()
.execute(
Expand Down Expand Up @@ -168,7 +233,30 @@ public void testNestedDataType() throws Exception {
put("map_col_2", Collections.singletonMap("key", 4));
}
});
Assertions.assertThatThrownBy(
() ->
verifyMultipleColumns(
tableName,
channel,
Collections.singletonList(newValue),
Arrays.asList(value, newValue),
"id"))
.isInstanceOf(SFException.class)
.hasMessage(
"The given row cannot be converted to the internal format: Invalid row 0."
+ " missingNotNullColNames=null,"
+ " extraColNames=[OBJECT_COL.map_col.key_value.value.list.element.b,"
+ " OBJECT_COL.map_col_2], nullValueForNotNullColNames=null")
.extracting("vendorCode")
.isEqualTo(ErrorCode.INVALID_FORMAT_ROW.getMessageCode());
channel.close();

SnowflakeStreamingIngestChannel newChannel = openChannel(tableName);
verifyMultipleColumns(
tableName, Collections.singletonList(newValue), Arrays.asList(value, newValue), "id");
tableName,
newChannel,
Collections.singletonList(newValue),
Arrays.asList(value, newValue),
"id");
}
}

0 comments on commit c939dc2

Please sign in to comment.