diff --git a/pom.xml b/pom.xml index 47bd67c86..46647d4be 100644 --- a/pom.xml +++ b/pom.xml @@ -339,7 +339,7 @@ net.snowflake snowflake-ingest-sdk - 3.0.0 + 3.0.1 net.snowflake diff --git a/pom_confluent.xml b/pom_confluent.xml index 14801e5a2..16168763a 100644 --- a/pom_confluent.xml +++ b/pom_confluent.xml @@ -480,7 +480,7 @@ net.snowflake snowflake-ingest-sdk - 3.0.0 + 3.0.1 net.snowflake diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergTableSchemaResolver.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergTableSchemaResolver.java index cf4df17e8..846606f1b 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergTableSchemaResolver.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergTableSchemaResolver.java @@ -61,26 +61,13 @@ private IcebergColumnSchema mapIcebergSchemaFromChannel( Map.Entry schemaFromChannelEntry) { ColumnProperties columnProperty = schemaFromChannelEntry.getValue(); - String plainIcebergSchema = getIcebergSchema(columnProperty); + String plainIcebergSchema = columnProperty.getIcebergSchema(); Type schema = IcebergDataTypeParser.deserializeIcebergType(plainIcebergSchema); String columnName = schemaFromChannelEntry.getKey(); return new IcebergColumnSchema(schema, columnName); } - // todo remove in 1820155 when getIcebergSchema() method is made public - private static String getIcebergSchema(ColumnProperties columnProperties) { - try { - java.lang.reflect.Field field = - columnProperties.getClass().getDeclaredField("icebergColumnSchema"); - field.setAccessible(true); - return (String) field.get(columnProperties); - } catch (IllegalAccessException | NoSuchFieldException e) { - throw new IllegalStateException( - "Couldn't set iceberg by accessing private field: isIceberg", e); - } - } - private boolean hasSchema(SinkRecord record) { return record.valueSchema() != null && record.valueSchema().fields() != null diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionNoSchemaEvolutionIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionNoSchemaEvolutionIT.java index 72ef11e7e..9600d4c56 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionNoSchemaEvolutionIT.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionNoSchemaEvolutionIT.java @@ -12,7 +12,6 @@ import java.util.List; import java.util.stream.Collectors; import java.util.stream.Stream; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -81,7 +80,6 @@ private static Stream prepareData() { @ParameterizedTest(name = "{0}") @MethodSource("prepareData") - @Disabled void shouldInsertRecords(String description, String message, boolean withSchema) throws Exception { service.insert( diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java index 0ead14d20..4e6c11713 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java @@ -16,7 +16,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.kafka.connect.sink.SinkRecord; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -43,7 +42,6 @@ protected void createIcebergTable() { @ParameterizedTest(name = "{0}") @MethodSource("prepareData") - @Disabled void shouldEvolveSchemaAndInsertRecords( String description, String message, DescribeTableRow[] expectedSchema, boolean withSchema) throws Exception { @@ -123,7 +121,6 @@ private static Stream prepareData() { /** Verify a scenario when structure is enriched with another field. */ @Test - @Disabled public void alterStructure_noSchema() throws Exception { // k1, k2 String testStruct1 = "{ \"testStruct\": { \"k1\" : 1, \"k2\" : 2 } }"; @@ -201,7 +198,6 @@ private void assertRecordsInTable() { } @Test - @Disabled public void testComplexRecordEvolution_withSchema() throws Exception { insertWithRetry(complexJsonWithSchemaExample, 0, true); waitForOffset(1); @@ -240,7 +236,6 @@ public void testComplexRecordEvolution_withSchema() throws Exception { } @Test - @Disabled public void testComplexRecordEvolution() throws Exception { insertWithRetry(complexJsonPayloadExample, 0, false); waitForOffset(1); @@ -279,7 +274,6 @@ public void testComplexRecordEvolution() throws Exception { /** Test just for a scenario when we see a record for the first time. */ @ParameterizedTest @MethodSource("schemasAndPayloads_brandNewColumns") - @Disabled public void addBrandNewColumns_withSchema( String payloadWithSchema, String expectedColumnName, String expectedType) throws Exception { // when @@ -310,7 +304,6 @@ private static Stream schemasAndPayloads_brandNewColumns() { @ParameterizedTest @MethodSource("primitiveEvolutionDataSource") - @Disabled public void testEvolutionOfPrimitives_withSchema( String singleBooleanField, String booleanAndInt, @@ -399,7 +392,6 @@ private static Stream primitiveEvolutionDataSource() { @ParameterizedTest @MethodSource("testEvolutionOfComplexTypes_dataSource") - @Disabled public void testEvolutionOfComplexTypes_withSchema( String objectVarchar, String objectWithNestedObject, @@ -485,7 +477,6 @@ private static Stream testEvolutionOfComplexTypes_dataSource() { } @Test - @Disabled void shouldAppendCommentTest() throws Exception { // when // insert record with a comment diff --git a/test/test_suites.py b/test/test_suites.py index f34076e82..d6306d0ad 100644 --- a/test/test_suites.py +++ b/test/test_suites.py @@ -624,8 +624,8 @@ def create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testS "TestIcebergJsonAws", EndToEndTestSuite( test_instance=TestIcebergJsonAws(driver, nameSalt), - run_in_confluent=False, # TODO set to true after ingest-sdk 3.0.1 release - run_in_apache=False, # TODO set to true after ingest-sdk 3.0.1 release + run_in_confluent=True, + run_in_apache=True, cloud_platform=CloudPlatform.AWS, ), ), @@ -633,7 +633,7 @@ def create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testS "TestIcebergAvroAws", EndToEndTestSuite( test_instance=TestIcebergAvroAws(driver, nameSalt), - run_in_confluent=False, # TODO set to true after ingest-sdk 3.0.1 release + run_in_confluent=True, run_in_apache=False, cloud_platform=CloudPlatform.AWS, ), @@ -642,8 +642,8 @@ def create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testS "TestIcebergSchemaEvolutionJsonAws", EndToEndTestSuite( test_instance=TestIcebergSchemaEvolutionJsonAws(driver, nameSalt), - run_in_confluent=False, # TODO set to true after ingest-sdk 3.0.1 release - run_in_apache=False, # TODO set to true after ingest-sdk 3.0.1 release + run_in_confluent=True, + run_in_apache=True, cloud_platform=CloudPlatform.AWS, ), ), @@ -651,7 +651,7 @@ def create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testS "TestIcebergSchemaEvolutionAvroAws", EndToEndTestSuite( test_instance=TestIcebergSchemaEvolutionAvroAws(driver, nameSalt), - run_in_confluent=False, # TODO set to true after ingest-sdk 3.0.1 release + run_in_confluent=True, run_in_apache=False, cloud_platform=CloudPlatform.AWS, ),