Skip to content

Commit

Permalink
Add tests for wrong types
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-wtrefon committed Dec 5, 2024
1 parent 92d1d14 commit 7556f65
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package com.snowflake.kafka.connector.streaming.iceberg;

import static com.snowflake.kafka.connector.streaming.iceberg.sql.ComplexJsonRecord.complexJsonPayloadExample;
import static com.snowflake.kafka.connector.streaming.iceberg.sql.ComplexJsonRecord.complexJsonPayloadWithWrongValueTypeExample;
import static com.snowflake.kafka.connector.streaming.iceberg.sql.ComplexJsonRecord.complexJsonRecordValueExample;
import static org.assertj.core.api.Assertions.assertThat;

import com.snowflake.kafka.connector.Utils;
import com.snowflake.kafka.connector.dlq.InMemoryKafkaRecordErrorReporter;
import com.snowflake.kafka.connector.streaming.iceberg.sql.ComplexJsonRecord;
import com.snowflake.kafka.connector.streaming.iceberg.sql.MetadataRecord;
import com.snowflake.kafka.connector.streaming.iceberg.sql.MetadataRecord.RecordWithMetadata;
Expand All @@ -12,7 +15,9 @@
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
Expand Down Expand Up @@ -91,10 +96,34 @@ void shouldInsertRecords(String description, String message, boolean withSchema)
service.insert(Collections.singletonList(createKafkaRecord(message, 2, withSchema)));
waitForOffset(3);

assertRecordsInTable();
assertRecordsInTable(0L, 1L, 2L);
}

private void assertRecordsInTable() {
@Test
@Disabled
void shouldSendValueWithWrongTypeToDLQ() throws Exception {
SinkRecord wrongValueRecord1 =
createKafkaRecord(complexJsonPayloadWithWrongValueTypeExample, 0, false);
SinkRecord wrongValueRecord2 =
createKafkaRecord(complexJsonPayloadWithWrongValueTypeExample, 2, false);
service.insert(
Arrays.asList(
wrongValueRecord1,
createKafkaRecord(complexJsonPayloadExample, 1, false),
wrongValueRecord2,
createKafkaRecord(complexJsonPayloadExample, 3, false),
createKafkaRecord(complexJsonPayloadExample, 4, false)));
waitForOffset(5);

assertRecordsInTable(1L, 3L, 4L);
List<InMemoryKafkaRecordErrorReporter.ReportedRecord> reportedRecords =
kafkaRecordErrorReporter.getReportedRecords();
assertThat(reportedRecords).hasSize(2);
assertThat(reportedRecords.stream().map(it -> it.getRecord()).collect(Collectors.toList()))
.containsExactlyInAnyOrder(wrongValueRecord1, wrongValueRecord2);
}

private void assertRecordsInTable(Long... expectedOffsets) {
List<RecordWithMetadata<ComplexJsonRecord>> recordsWithMetadata =
selectAllComplexJsonRecordFromRecordContent();
assertThat(recordsWithMetadata)
Expand All @@ -108,7 +137,9 @@ private void assertRecordsInTable() {
recordsWithMetadata.stream()
.map(RecordWithMetadata::getMetadata)
.collect(Collectors.toList());
assertThat(metadataRecords).extracting(MetadataRecord::getOffset).containsExactly(0L, 1L, 2L);
assertThat(metadataRecords)
.extracting(MetadataRecord::getOffset)
.containsExactly(expectedOffsets);
assertThat(metadataRecords)
.hasSize(3)
.allMatch(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,27 @@ void nullOrEmptyValueShouldBeSentToDLQOnlyWhenNoSchema(
assertThat(reportedRecords.get(0).getRecord()).isEqualTo(emptyOrNullRecord);
}

@ParameterizedTest(name = "{0}")
@MethodSource("wrongTypeValueMessages_dataSource")
@Disabled
void shouldSendValueWithWrongTypeToDLQ(
String description, String correctValueJson, String wrongValueJson) throws Exception {
// when
// init schema with first correct value
insertWithRetry(correctValueJson, 0, false);

// insert record with wrong value followed by
SinkRecord wrongValueRecord = createKafkaRecord(wrongValueJson, 1, false);
service.insert(Arrays.asList(wrongValueRecord, createKafkaRecord(correctValueJson, 2, false)));

// then
waitForOffset(3);
List<InMemoryKafkaRecordErrorReporter.ReportedRecord> reportedRecords =
kafkaRecordErrorReporter.getReportedRecords();
assertThat(reportedRecords).hasSize(1);
assertThat(reportedRecords.get(0).getRecord()).isEqualTo(wrongValueRecord);
}

private static Stream<Arguments> nullOrEmptyValueShouldBeSentToDLQOnlyWhenNoSchema_dataSource() {
return Stream.of(
Arguments.of("Null int", "{\"test\" : null }", "{\"test\" : 1 }"),
Expand All @@ -554,4 +575,16 @@ private static Stream<Arguments> nullOrEmptyValueShouldBeSentToDLQOnlyWhenNoSche
"{\"test\" : {\"test2\": {}} }",
"{\"test\" : {\"test2\": {\"test3\": 1}} }"));
}

private static Stream<Arguments> wrongTypeValueMessages_dataSource() {
return Stream.of(
Arguments.of("Boolean into double column", "{\"test\" : 2.5 }", "{\"test\" : true }"),
Arguments.of("String into double column", "{\"test\" : 2.5 }", "{\"test\" : \"Solnik\" }"),
Arguments.of("Int into list", "{\"test\" : [1,2] }", "{\"test\" : 1 }"),
Arguments.of("Int into object", "{\"test\" : {\"test2\": 1} }", "{\"test\" : 1 }"),
Arguments.of(
"String into boolean in nested object",
"{\"test\" : {\"test2\": true} }",
"{\"test\" : {\"test2\": \"solnik\"} }"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ public class ComplexJsonRecord {
true,
ImmutableList.of(1, 2, 3),
ImmutableList.of("a", "b", "c"),
ImmutableList.of(),
null,
ImmutableList.of(true),
ImmutableList.of(1, 4),
ImmutableList.of(ImmutableList.of(7, 8, 9), ImmutableList.of(10, 11, 12)),
PrimitiveJsonRecord.primitiveJsonRecordValueExample,
null);
PrimitiveJsonRecord.primitiveJsonRecordValueExample);

public static final String complexJsonPayloadExample =
"{"
Expand All @@ -58,6 +58,27 @@ public class ComplexJsonRecord {
+ PrimitiveJsonRecord.primitiveJsonExample
+ "}";

public static final String complexJsonPayloadWithWrongValueTypeExample =
"{"
+ " \"id_int8\": 8,"
+ " \"id_int16\": 16,"
+ " \"id_int32\": 32,"
+ " \"id_int64\": 64,"
+ " \"description\": \"dogs are the best\","
+ " \"rating_float32\": 0.5,"
+ " \"rating_float64\": 0.25,"
+ " \"approval\": true,"
+ " \"array1\": [1, 2, 3],"
+ " \"array2\": [\"a\", \"b\", \"c\"],"
+ " \"array3\": [true],"
+ " \"array4\": [1, 4],"
+ " \"array5\": [[7, 8, 9], [10, 11, 12]],"
+ " \"nestedRecord\": "
+ PrimitiveJsonRecord.primitiveJsonExample
+ ","
+ " \"nestedRecord2\": 25"
+ "}";

public static final String complexJsonWithSchemaExample =
"{"
+ " \"schema\": {"
Expand Down

0 comments on commit 7556f65

Please sign in to comment.