diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/ParseIcebergColumnTreeTest.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/ParseIcebergColumnTreeTest.java index acda05f39..6ab858b20 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/ParseIcebergColumnTreeTest.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/ParseIcebergColumnTreeTest.java @@ -120,8 +120,6 @@ static Stream parseFromJsonArguments() { + "}}", "OBJECT(vehicle2 OBJECT(car OBJECT(brand VARCHAR)), " + "vehicle1 OBJECT(car OBJECT(brand VARCHAR)))"), - // <- todo lol with k1, k2 the order is natural, however it changes an order when I used - // vehicles - inspect it arguments( "{ \"testColumnName\": {" + "\"k1\" : { \"car\" : { \"brand\" : \"vw\" } }," @@ -178,7 +176,10 @@ void mergeTwoTreesTest(String plainIcebergSchema, String recordJson, String expe static Stream mergeTestArguments() { return Stream.of( arguments( - "{\"type\":\"struct\",\"fields\":[{\"id\":23,\"name\":\"k1\",\"required\":false,\"type\":\"int\"},{\"id\":24,\"name\":\"k2\",\"required\":false,\"type\":\"int\"}]}", + "{\"type\":\"struct\",\"fields\":[" + + "{\"id\":23,\"name\":\"k1\",\"required\":false,\"type\":\"int\"}," + + "{\"id\":24,\"name\":\"k2\",\"required\":false,\"type\":\"int\"}" + + "]}", "{ \"testStruct\": { \"k1\" : 1, \"k2\" : 2, \"k3\" : 3 } }", "OBJECT(k1 INT, k2 INT, k3 LONG)"), arguments( @@ -201,13 +202,17 @@ static Stream mergeTestArguments() { "OBJECT(k1 INT, k2 INT, nested_object OBJECT(nested_key1" + " VARCHAR(16777216), nested_key2 VARCHAR(16777216), nested_object2" + " OBJECT(nested_key2 DOUBLE)))"), - // ARRAY evolution + // ARRAY merge arguments( "{\"type\":\"list\",\"element-id\":23,\"element\":\"long\",\"element-required\":false}", "{\"TESTSTRUCT\": [1,2,3] }", "ARRAY(LONG)"), arguments( - "{\"type\":\"list\",\"element-id\":1,\"element\":{\"type\":\"struct\",\"fields\":[{\"id\":1,\"name\":\"primitive\",\"required\":true,\"type\":\"boolean\"}]},\"element-required\":true}", + "{\"type\":\"list\",\"element-id\":1,\"element\":{" + + "\"type\":\"struct\",\"fields\":[" + + "{\"id\":1,\"name\":\"primitive\",\"required\":true,\"type\":\"boolean\"}" + + "]}," + + "\"element-required\":true}", "{\"TESTSTRUCT\": [ { \"primitive\" : true, \"new_field\" : 25878749237493287429348 }]" + " }", "ARRAY(OBJECT(primitive BOOLEAN, new_field LONG))")); diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionNoSchemaEvolutionIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionNoSchemaEvolutionIT.java index a58d42239..72ef11e7e 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionNoSchemaEvolutionIT.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionNoSchemaEvolutionIT.java @@ -75,7 +75,8 @@ private static Stream prepareData() { return Stream.of( Arguments.of( "Complex JSON with schema", ComplexJsonRecord.complexJsonWithSchemaExample, true), - Arguments.of("Complex JSON without schema", ComplexJsonRecord.complexJsonExample, false)); + Arguments.of( + "Complex JSON without schema", ComplexJsonRecord.complexJsonPayloadExample, false)); } @ParameterizedTest(name = "{0}") diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java index 07ed6d3f5..6abed954d 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java @@ -16,6 +16,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -35,6 +36,7 @@ protected void createIcebergTable() { @ParameterizedTest(name = "{0}") @MethodSource("prepareData") + @Disabled void shouldEvolveSchemaAndInsertRecords( String description, String message, DescribeTableRow[] expectedSchema, boolean withSchema) throws Exception { @@ -114,6 +116,7 @@ private static Stream prepareData() { /** Verify a scenario when structure is enriched with another field. */ @Test + @Disabled public void alterStructure_noSchema() throws Exception { // k1, k2 String testStruct1 = "{ \"testStruct\": { \"k1\" : 1, \"k2\" : 2 } }"; @@ -191,17 +194,84 @@ private void assertRecordsInTable() { } @Test + @Disabled public void testComplexRecordEvolution_withSchema() throws Exception { insertWithRetry(complexJsonWithSchemaExample, 0, true); waitForOffset(1); List columns = describeTable(tableName); assertEquals(columns.size(), 16); + + DescribeTableRow[] expectedSchema = + new DescribeTableRow[] { + new DescribeTableRow("RECORD_METADATA", RECORD_METADATA_TYPE), + new DescribeTableRow("ID_INT8", "NUMBER(10,0)"), + new DescribeTableRow("ID_INT16", "NUMBER(10,0)"), + new DescribeTableRow("ID_INT32", "NUMBER(10,0)"), + new DescribeTableRow("ID_INT64", "NUMBER(19,0)"), + new DescribeTableRow("DESCRIPTION", "VARCHAR(16777216)"), + new DescribeTableRow("RATING_FLOAT32", "FLOAT"), + new DescribeTableRow("RATING_FLOAT64", "FLOAT"), + new DescribeTableRow("APPROVAL", "BOOLEAN"), + new DescribeTableRow("ARRAY1", "ARRAY(NUMBER(10,0))"), + new DescribeTableRow("ARRAY2", "ARRAY(VARCHAR(16777216))"), + new DescribeTableRow("ARRAY3", "ARRAY(BOOLEAN)"), + new DescribeTableRow("ARRAY4", "ARRAY(NUMBER(10,0))"), + new DescribeTableRow("ARRAY5", "ARRAY(ARRAY(NUMBER(10,0)))"), + new DescribeTableRow( + "NESTEDRECORD", + "OBJECT(id_int8 NUMBER(10,0), id_int16 NUMBER(10,0), id_int32 NUMBER(10,0), id_int64" + + " NUMBER(19,0), description VARCHAR(16777216), rating_float32 FLOAT," + + " rating_float64 FLOAT, approval BOOLEAN)"), + new DescribeTableRow( + "NESTEDRECORD2", + "OBJECT(id_int8 NUMBER(10,0), id_int16 NUMBER(10,0), id_int32 NUMBER(10,0), id_int64" + + " NUMBER(19,0), description VARCHAR(16777216), rating_float32 FLOAT," + + " rating_float64 FLOAT, approval BOOLEAN)"), + }; + assertThat(columns).containsExactlyInAnyOrder(expectedSchema); + } + + @Test + public void testComplexRecordEvolution() throws Exception { + insertWithRetry(complexJsonPayloadExample, 0, false); + waitForOffset(1); + + List columns = describeTable(tableName); + assertEquals(columns.size(), 16); + + DescribeTableRow[] expectedSchema = + new DescribeTableRow[] { + new DescribeTableRow("RECORD_METADATA", RECORD_METADATA_TYPE), + new DescribeTableRow("ID_INT8", "NUMBER(19,0)"), + new DescribeTableRow("ID_INT16", "NUMBER(19,0)"), + new DescribeTableRow("ID_INT32", "NUMBER(19,0)"), + new DescribeTableRow("ID_INT64", "NUMBER(19,0)"), + new DescribeTableRow("DESCRIPTION", "VARCHAR(16777216)"), + new DescribeTableRow("RATING_FLOAT32", "FLOAT"), + new DescribeTableRow("RATING_FLOAT64", "FLOAT"), + new DescribeTableRow("APPROVAL", "BOOLEAN"), + new DescribeTableRow("ARRAY1", "ARRAY(NUMBER(19,0))"), + new DescribeTableRow("ARRAY2", "ARRAY(VARCHAR(16777216))"), + new DescribeTableRow("ARRAY3", "ARRAY(BOOLEAN)"), + // "array4" : null -> VARCHAR(16777216 + new DescribeTableRow("ARRAY4", "VARCHAR(16777216)"), + new DescribeTableRow("ARRAY5", "ARRAY(ARRAY(NUMBER(19,0)))"), + new DescribeTableRow( + "NESTEDRECORD", + "OBJECT(id_int8 NUMBER(19,0), id_int16 NUMBER(19,0), rating_float32 FLOAT," + + " rating_float64 FLOAT, approval BOOLEAN, id_int32 NUMBER(19,0), description" + + " VARCHAR(16777216), id_int64 NUMBER(19,0))"), + // "nestedRecord2": null -> VARCHAR(16777216) + new DescribeTableRow("NESTEDRECORD2", "VARCHAR(16777216)"), + }; + assertThat(columns).containsExactlyInAnyOrder(expectedSchema); } /** Test just for a scenario when we see a record for the first time. */ @ParameterizedTest @MethodSource("schemasAndPayloads_brandNewColumns") + @Disabled public void addBrandNewColumns_withSchema( String payloadWithSchema, String expectedColumnName, String expectedType) throws Exception { // when @@ -232,6 +302,7 @@ private static Stream schemasAndPayloads_brandNewColumns() { @ParameterizedTest @MethodSource("primitiveEvolutionDataSource") + @Disabled public void testEvolutionOfPrimitives_withSchema( String singleBooleanField, String booleanAndInt, @@ -320,6 +391,7 @@ private static Stream primitiveEvolutionDataSource() { @ParameterizedTest @MethodSource("testEvolutionOfComplexTypes_dataSource") + @Disabled public void testEvolutionOfComplexTypes_withSchema( String objectVarchar, String objectWithNestedObject, @@ -404,66 +476,6 @@ private static Stream testEvolutionOfComplexTypes_dataSource() { false)); } - @Test - public void evolveSchemaRandomDataTest() throws Exception { - String testStruct1 = - "{" - + " \"_id\": \"673f3b93a56dd01a8a0cb6a4\"," - + " \"index\": 0," - + " \"guid\": \"738142bb-5878-42ad-bf35-6015f63b67dd\"," - + " \"isActive\": true," - + " \"balance\": \"$1,690.88\"," - + " \"picture\": \"http://placehold.it/32x32\"," - + " \"age\": 38," - + " \"eyeColor\": \"blue\"," - + " \"name\": \"Davis Heath\"," - + " \"gender\": \"male\"," - + " \"company\": \"EVENTEX\"," - + " \"email\": \"davisheath@eventex.com\"," - + " \"phone\": \"+1 (987) 471-3852\"," - + " \"address\": \"768 Cypress Court, Lookingglass, Kansas, 5659\"," - + " \"about\": \"Nisi voluptate id occaecat nisi pariatur dolore laborum labore ea" - + " reprehenderit consequat sint fugiat sunt. Et consequat esse ex cillum deserunt" - + " Lorem. Enim nisi tempor non nisi. Consectetur ut ad reprehenderit fugiat et" - + " adipisicing sint. Deserunt est proident exercitation sit cillum in non excepteur" - + " aliqua qui amet cillum sint aliquip.\\r" - + "\"," - + " \"registered\": \"2023-09-19T09:41:45 -02:00\"," - + " \"latitude\": 13.997901," - + " \"longitude\": 130.854106," - + " \"tags\": [" - + " \"et\"," - + " \"ut\"," - + " \"elit\"," - + " \"do\"," - + " \"nostrud\"," - + " \"id\"," - + " \"veniam\"" - + " ]," - + " \"friends\": [" - + " {" - + " \"id\": 0," - + " \"name\": \"Sandoval Hodges\"" - + " }," - + " {" - + " \"id\": 1," - + " \"name\": \"Ramirez Brooks\"" - + " }," - + " {" - + " \"id\": 2," - + " \"name\": \"Vivian Whitfield\"" - + " }" - + " ]," - + " \"greeting\": \"Hello, Davis Heath! You have 4 unread messages.\"," - + " \"favoriteFruit\": \"strawberry\"" - + " }"; - insertWithRetry(testStruct1, 0, false); - waitForOffset(1); - - List columns = describeTable(tableName); - assertEquals(columns.size(), 23); - } - private static final String RECORD_METADATA_TYPE = "OBJECT(offset NUMBER(10,0), topic VARCHAR(16777216), partition NUMBER(10,0), key" + " VARCHAR(16777216), schema_id NUMBER(10,0), key_schema_id NUMBER(10,0)," diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/sql/ComplexJsonRecord.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/sql/ComplexJsonRecord.java index 92adbb66d..677e8f80c 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/sql/ComplexJsonRecord.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/sql/ComplexJsonRecord.java @@ -36,7 +36,7 @@ public class ComplexJsonRecord { PrimitiveJsonRecord.primitiveJsonRecordValueExample, null); - public static final String complexJsonExample = + public static final String complexJsonPayloadExample = "{" + " \"id_int8\": 8," + " \"id_int16\": 16," @@ -218,7 +218,7 @@ public class ComplexJsonRecord { + " \"name\": \"sf.kc.test\"" + " }," + " \"payload\": " - + complexJsonExample + + complexJsonPayloadExample + "}"; private static final ObjectMapper MAPPER =