-
Notifications
You must be signed in to change notification settings - Fork 100
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
b1108f8
commit 2cac1db
Showing
4 changed files
with
86 additions
and
68 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,7 @@ | |
import java.util.stream.Collectors; | ||
import java.util.stream.Stream; | ||
import org.apache.kafka.connect.sink.SinkRecord; | ||
import org.junit.jupiter.api.Disabled; | ||
import org.junit.jupiter.api.Test; | ||
import org.junit.jupiter.params.ParameterizedTest; | ||
import org.junit.jupiter.params.provider.Arguments; | ||
|
@@ -35,6 +36,7 @@ protected void createIcebergTable() { | |
|
||
@ParameterizedTest(name = "{0}") | ||
@MethodSource("prepareData") | ||
@Disabled | ||
void shouldEvolveSchemaAndInsertRecords( | ||
String description, String message, DescribeTableRow[] expectedSchema, boolean withSchema) | ||
throws Exception { | ||
|
@@ -114,6 +116,7 @@ private static Stream<Arguments> prepareData() { | |
|
||
/** Verify a scenario when structure is enriched with another field. */ | ||
@Test | ||
@Disabled | ||
public void alterStructure_noSchema() throws Exception { | ||
// k1, k2 | ||
String testStruct1 = "{ \"testStruct\": { \"k1\" : 1, \"k2\" : 2 } }"; | ||
|
@@ -191,17 +194,84 @@ private void assertRecordsInTable() { | |
} | ||
|
||
@Test | ||
@Disabled | ||
public void testComplexRecordEvolution_withSchema() throws Exception { | ||
insertWithRetry(complexJsonWithSchemaExample, 0, true); | ||
waitForOffset(1); | ||
|
||
List<DescribeTableRow> columns = describeTable(tableName); | ||
assertEquals(columns.size(), 16); | ||
|
||
DescribeTableRow[] expectedSchema = | ||
new DescribeTableRow[] { | ||
new DescribeTableRow("RECORD_METADATA", RECORD_METADATA_TYPE), | ||
new DescribeTableRow("ID_INT8", "NUMBER(10,0)"), | ||
new DescribeTableRow("ID_INT16", "NUMBER(10,0)"), | ||
new DescribeTableRow("ID_INT32", "NUMBER(10,0)"), | ||
new DescribeTableRow("ID_INT64", "NUMBER(19,0)"), | ||
new DescribeTableRow("DESCRIPTION", "VARCHAR(16777216)"), | ||
new DescribeTableRow("RATING_FLOAT32", "FLOAT"), | ||
new DescribeTableRow("RATING_FLOAT64", "FLOAT"), | ||
new DescribeTableRow("APPROVAL", "BOOLEAN"), | ||
new DescribeTableRow("ARRAY1", "ARRAY(NUMBER(10,0))"), | ||
new DescribeTableRow("ARRAY2", "ARRAY(VARCHAR(16777216))"), | ||
new DescribeTableRow("ARRAY3", "ARRAY(BOOLEAN)"), | ||
new DescribeTableRow("ARRAY4", "ARRAY(NUMBER(10,0))"), | ||
new DescribeTableRow("ARRAY5", "ARRAY(ARRAY(NUMBER(10,0)))"), | ||
new DescribeTableRow( | ||
"NESTEDRECORD", | ||
"OBJECT(id_int8 NUMBER(10,0), id_int16 NUMBER(10,0), id_int32 NUMBER(10,0), id_int64" | ||
+ " NUMBER(19,0), description VARCHAR(16777216), rating_float32 FLOAT," | ||
+ " rating_float64 FLOAT, approval BOOLEAN)"), | ||
new DescribeTableRow( | ||
"NESTEDRECORD2", | ||
"OBJECT(id_int8 NUMBER(10,0), id_int16 NUMBER(10,0), id_int32 NUMBER(10,0), id_int64" | ||
+ " NUMBER(19,0), description VARCHAR(16777216), rating_float32 FLOAT," | ||
+ " rating_float64 FLOAT, approval BOOLEAN)"), | ||
}; | ||
assertThat(columns).containsExactlyInAnyOrder(expectedSchema); | ||
} | ||
|
||
@Test | ||
public void testComplexRecordEvolution() throws Exception { | ||
insertWithRetry(complexJsonPayloadExample, 0, false); | ||
waitForOffset(1); | ||
|
||
List<DescribeTableRow> columns = describeTable(tableName); | ||
assertEquals(columns.size(), 16); | ||
|
||
DescribeTableRow[] expectedSchema = | ||
new DescribeTableRow[] { | ||
new DescribeTableRow("RECORD_METADATA", RECORD_METADATA_TYPE), | ||
new DescribeTableRow("ID_INT8", "NUMBER(19,0)"), | ||
new DescribeTableRow("ID_INT16", "NUMBER(19,0)"), | ||
new DescribeTableRow("ID_INT32", "NUMBER(19,0)"), | ||
new DescribeTableRow("ID_INT64", "NUMBER(19,0)"), | ||
new DescribeTableRow("DESCRIPTION", "VARCHAR(16777216)"), | ||
new DescribeTableRow("RATING_FLOAT32", "FLOAT"), | ||
new DescribeTableRow("RATING_FLOAT64", "FLOAT"), | ||
new DescribeTableRow("APPROVAL", "BOOLEAN"), | ||
new DescribeTableRow("ARRAY1", "ARRAY(NUMBER(19,0))"), | ||
new DescribeTableRow("ARRAY2", "ARRAY(VARCHAR(16777216))"), | ||
new DescribeTableRow("ARRAY3", "ARRAY(BOOLEAN)"), | ||
// "array4" : null -> VARCHAR(16777216 | ||
new DescribeTableRow("ARRAY4", "VARCHAR(16777216)"), | ||
new DescribeTableRow("ARRAY5", "ARRAY(ARRAY(NUMBER(19,0)))"), | ||
new DescribeTableRow( | ||
"NESTEDRECORD", | ||
"OBJECT(id_int8 NUMBER(19,0), id_int16 NUMBER(19,0), rating_float32 FLOAT," | ||
+ " rating_float64 FLOAT, approval BOOLEAN, id_int32 NUMBER(19,0), description" | ||
+ " VARCHAR(16777216), id_int64 NUMBER(19,0))"), | ||
// "nestedRecord2": null -> VARCHAR(16777216) | ||
new DescribeTableRow("NESTEDRECORD2", "VARCHAR(16777216)"), | ||
}; | ||
assertThat(columns).containsExactlyInAnyOrder(expectedSchema); | ||
} | ||
|
||
/** Test just for a scenario when we see a record for the first time. */ | ||
@ParameterizedTest | ||
@MethodSource("schemasAndPayloads_brandNewColumns") | ||
@Disabled | ||
public void addBrandNewColumns_withSchema( | ||
String payloadWithSchema, String expectedColumnName, String expectedType) throws Exception { | ||
// when | ||
|
@@ -232,6 +302,7 @@ private static Stream<Arguments> schemasAndPayloads_brandNewColumns() { | |
|
||
@ParameterizedTest | ||
@MethodSource("primitiveEvolutionDataSource") | ||
@Disabled | ||
public void testEvolutionOfPrimitives_withSchema( | ||
String singleBooleanField, | ||
String booleanAndInt, | ||
|
@@ -320,6 +391,7 @@ private static Stream<Arguments> primitiveEvolutionDataSource() { | |
|
||
@ParameterizedTest | ||
@MethodSource("testEvolutionOfComplexTypes_dataSource") | ||
@Disabled | ||
public void testEvolutionOfComplexTypes_withSchema( | ||
String objectVarchar, | ||
String objectWithNestedObject, | ||
|
@@ -404,66 +476,6 @@ private static Stream<Arguments> testEvolutionOfComplexTypes_dataSource() { | |
false)); | ||
} | ||
|
||
@Test | ||
public void evolveSchemaRandomDataTest() throws Exception { | ||
String testStruct1 = | ||
"{" | ||
+ " \"_id\": \"673f3b93a56dd01a8a0cb6a4\"," | ||
+ " \"index\": 0," | ||
+ " \"guid\": \"738142bb-5878-42ad-bf35-6015f63b67dd\"," | ||
+ " \"isActive\": true," | ||
+ " \"balance\": \"$1,690.88\"," | ||
+ " \"picture\": \"http://placehold.it/32x32\"," | ||
+ " \"age\": 38," | ||
+ " \"eyeColor\": \"blue\"," | ||
+ " \"name\": \"Davis Heath\"," | ||
+ " \"gender\": \"male\"," | ||
+ " \"company\": \"EVENTEX\"," | ||
+ " \"email\": \"[email protected]\"," | ||
+ " \"phone\": \"+1 (987) 471-3852\"," | ||
+ " \"address\": \"768 Cypress Court, Lookingglass, Kansas, 5659\"," | ||
+ " \"about\": \"Nisi voluptate id occaecat nisi pariatur dolore laborum labore ea" | ||
+ " reprehenderit consequat sint fugiat sunt. Et consequat esse ex cillum deserunt" | ||
+ " Lorem. Enim nisi tempor non nisi. Consectetur ut ad reprehenderit fugiat et" | ||
+ " adipisicing sint. Deserunt est proident exercitation sit cillum in non excepteur" | ||
+ " aliqua qui amet cillum sint aliquip.\\r" | ||
+ "\"," | ||
+ " \"registered\": \"2023-09-19T09:41:45 -02:00\"," | ||
+ " \"latitude\": 13.997901," | ||
+ " \"longitude\": 130.854106," | ||
+ " \"tags\": [" | ||
+ " \"et\"," | ||
+ " \"ut\"," | ||
+ " \"elit\"," | ||
+ " \"do\"," | ||
+ " \"nostrud\"," | ||
+ " \"id\"," | ||
+ " \"veniam\"" | ||
+ " ]," | ||
+ " \"friends\": [" | ||
+ " {" | ||
+ " \"id\": 0," | ||
+ " \"name\": \"Sandoval Hodges\"" | ||
+ " }," | ||
+ " {" | ||
+ " \"id\": 1," | ||
+ " \"name\": \"Ramirez Brooks\"" | ||
+ " }," | ||
+ " {" | ||
+ " \"id\": 2," | ||
+ " \"name\": \"Vivian Whitfield\"" | ||
+ " }" | ||
+ " ]," | ||
+ " \"greeting\": \"Hello, Davis Heath! You have 4 unread messages.\"," | ||
+ " \"favoriteFruit\": \"strawberry\"" | ||
+ " }"; | ||
insertWithRetry(testStruct1, 0, false); | ||
waitForOffset(1); | ||
|
||
List<DescribeTableRow> columns = describeTable(tableName); | ||
assertEquals(columns.size(), 23); | ||
} | ||
|
||
private static final String RECORD_METADATA_TYPE = | ||
"OBJECT(offset NUMBER(10,0), topic VARCHAR(16777216), partition NUMBER(10,0), key" | ||
+ " VARCHAR(16777216), schema_id NUMBER(10,0), key_schema_id NUMBER(10,0)," | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters