Skip to content

Commit

Permalink
SNOW-1820302 Integrate with ingest sdk 3.0.1
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-mbobowski committed Dec 4, 2024
1 parent 90670e4 commit 9fa62c8
Show file tree
Hide file tree
Showing 6 changed files with 9 additions and 33 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@
<dependency>
<groupId>net.snowflake</groupId>
<artifactId>snowflake-ingest-sdk</artifactId>
<version>3.0.0</version>
<version>3.0.1</version>
<exclusions>
<exclusion>
<groupId>net.snowflake</groupId>
Expand Down
2 changes: 1 addition & 1 deletion pom_confluent.xml
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@
<dependency>
<groupId>net.snowflake</groupId>
<artifactId>snowflake-ingest-sdk</artifactId>
<version>3.0.0</version>
<version>3.0.1</version>
<exclusions>
<exclusion>
<groupId>net.snowflake</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,26 +61,13 @@ private IcebergColumnSchema mapIcebergSchemaFromChannel(
Map.Entry<String, ColumnProperties> schemaFromChannelEntry) {

ColumnProperties columnProperty = schemaFromChannelEntry.getValue();
String plainIcebergSchema = getIcebergSchema(columnProperty);
String plainIcebergSchema = columnProperty.getIcebergSchema();

Type schema = IcebergDataTypeParser.deserializeIcebergType(plainIcebergSchema);
String columnName = schemaFromChannelEntry.getKey();
return new IcebergColumnSchema(schema, columnName);
}

// todo remove in 1820155 when getIcebergSchema() method is made public
private static String getIcebergSchema(ColumnProperties columnProperties) {
try {
java.lang.reflect.Field field =
columnProperties.getClass().getDeclaredField("icebergColumnSchema");
field.setAccessible(true);
return (String) field.get(columnProperties);
} catch (IllegalAccessException | NoSuchFieldException e) {
throw new IllegalStateException(
"Couldn't set iceberg by accessing private field: isIceberg", e);
}
}

private boolean hasSchema(SinkRecord record) {
return record.valueSchema() != null
&& record.valueSchema().fields() != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
Expand Down Expand Up @@ -81,7 +80,6 @@ private static Stream<Arguments> prepareData() {

@ParameterizedTest(name = "{0}")
@MethodSource("prepareData")
@Disabled
void shouldInsertRecords(String description, String message, boolean withSchema)
throws Exception {
service.insert(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
Expand All @@ -43,7 +42,6 @@ protected void createIcebergTable() {

@ParameterizedTest(name = "{0}")
@MethodSource("prepareData")
@Disabled
void shouldEvolveSchemaAndInsertRecords(
String description, String message, DescribeTableRow[] expectedSchema, boolean withSchema)
throws Exception {
Expand Down Expand Up @@ -123,7 +121,6 @@ private static Stream<Arguments> prepareData() {

/** Verify a scenario when structure is enriched with another field. */
@Test
@Disabled
public void alterStructure_noSchema() throws Exception {
// k1, k2
String testStruct1 = "{ \"testStruct\": { \"k1\" : 1, \"k2\" : 2 } }";
Expand Down Expand Up @@ -201,7 +198,6 @@ private void assertRecordsInTable() {
}

@Test
@Disabled
public void testComplexRecordEvolution_withSchema() throws Exception {
insertWithRetry(complexJsonWithSchemaExample, 0, true);
waitForOffset(1);
Expand Down Expand Up @@ -240,7 +236,6 @@ public void testComplexRecordEvolution_withSchema() throws Exception {
}

@Test
@Disabled
public void testComplexRecordEvolution() throws Exception {
insertWithRetry(complexJsonPayloadExample, 0, false);
waitForOffset(1);
Expand Down Expand Up @@ -279,7 +274,6 @@ public void testComplexRecordEvolution() throws Exception {
/** Test just for a scenario when we see a record for the first time. */
@ParameterizedTest
@MethodSource("schemasAndPayloads_brandNewColumns")
@Disabled
public void addBrandNewColumns_withSchema(
String payloadWithSchema, String expectedColumnName, String expectedType) throws Exception {
// when
Expand Down Expand Up @@ -310,7 +304,6 @@ private static Stream<Arguments> schemasAndPayloads_brandNewColumns() {

@ParameterizedTest
@MethodSource("primitiveEvolutionDataSource")
@Disabled
public void testEvolutionOfPrimitives_withSchema(
String singleBooleanField,
String booleanAndInt,
Expand Down Expand Up @@ -399,7 +392,6 @@ private static Stream<Arguments> primitiveEvolutionDataSource() {

@ParameterizedTest
@MethodSource("testEvolutionOfComplexTypes_dataSource")
@Disabled
public void testEvolutionOfComplexTypes_withSchema(
String objectVarchar,
String objectWithNestedObject,
Expand Down Expand Up @@ -485,7 +477,6 @@ private static Stream<Arguments> testEvolutionOfComplexTypes_dataSource() {
}

@Test
@Disabled
void shouldAppendCommentTest() throws Exception {
// when
// insert record with a comment
Expand Down
12 changes: 6 additions & 6 deletions test/test_suites.py
Original file line number Diff line number Diff line change
Expand Up @@ -624,16 +624,16 @@ def create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testS
"TestIcebergJsonAws",
EndToEndTestSuite(
test_instance=TestIcebergJsonAws(driver, nameSalt),
run_in_confluent=False, # TODO set to true after ingest-sdk 3.0.1 release
run_in_apache=False, # TODO set to true after ingest-sdk 3.0.1 release
run_in_confluent=True,
run_in_apache=True,
cloud_platform=CloudPlatform.AWS,
),
),
(
"TestIcebergAvroAws",
EndToEndTestSuite(
test_instance=TestIcebergAvroAws(driver, nameSalt),
run_in_confluent=False, # TODO set to true after ingest-sdk 3.0.1 release
run_in_confluent=True,
run_in_apache=False,
cloud_platform=CloudPlatform.AWS,
),
Expand All @@ -642,16 +642,16 @@ def create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testS
"TestIcebergSchemaEvolutionJsonAws",
EndToEndTestSuite(
test_instance=TestIcebergSchemaEvolutionJsonAws(driver, nameSalt),
run_in_confluent=False, # TODO set to true after ingest-sdk 3.0.1 release
run_in_apache=False, # TODO set to true after ingest-sdk 3.0.1 release
run_in_confluent=True,
run_in_apache=True,
cloud_platform=CloudPlatform.AWS,
),
),
(
"TestIcebergSchemaEvolutionAvroAws",
EndToEndTestSuite(
test_instance=TestIcebergSchemaEvolutionAvroAws(driver, nameSalt),
run_in_confluent=False, # TODO set to true after ingest-sdk 3.0.1 release
run_in_confluent=True,
run_in_apache=False,
cloud_platform=CloudPlatform.AWS,
),
Expand Down

0 comments on commit 9fa62c8

Please sign in to comment.