Skip to content

Commit

Permalink
[FLINK-36375][cdc-runtime] fix missing default value in AddColumnEvent
Browse files Browse the repository at this point in the history
  • Loading branch information
qg-lin committed Sep 26, 2024
1 parent 17c0dc4 commit d638398
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,9 @@ private List<SchemaChangeEvent> lenientizeSchemaChangeEvent(SchemaChangeEvent ev
.getType()
.nullable(),
col.getAddColumn()
.getComment())))
.getComment(),
col.getAddColumn()
.getDefaultValueExpression())))
.collect(Collectors.toList())));
}
case DROP_COLUMN:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,18 @@ void testOneToOneMapping() {
AddColumnEvent.ColumnWithPosition newCol2 =
new AddColumnEvent.ColumnWithPosition(
new PhysicalColumn("new_col2", DataTypes.STRING(), null));
List<AddColumnEvent.ColumnWithPosition> newColumns = Arrays.asList(newCol1, newCol2);
AddColumnEvent.ColumnWithPosition newCol3 =
new AddColumnEvent.ColumnWithPosition(
new PhysicalColumn("new_col3", DataTypes.STRING(), null, "abc"));
List<AddColumnEvent.ColumnWithPosition> newColumns =
Arrays.asList(newCol1, newCol2, newCol3);
List<SchemaChangeEvent> derivedChangesAfterAddColumn =
schemaDerivation.applySchemaChange(new AddColumnEvent(TABLE_1, newColumns));
assertThat(derivedChangesAfterAddColumn).hasSize(1);
assertThat(derivedChangesAfterAddColumn.get(0))
.asAddColumnEvent()
.hasTableId(MERGED_TABLE)
.containsAddedColumns(newCol1, newCol2);
.containsAddedColumns(newCol1, newCol2, newCol3);

// Alter column type
ImmutableMap<String, DataType> typeMapping = ImmutableMap.of("age", DataTypes.BIGINT());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,10 @@ void testHandlingAddColumnEvent() {
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("append_before_phone", DataTypes.BIGINT()),
AddColumnEvent.ColumnPosition.BEFORE,
"phone"));
"phone"),
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn(
"col_with_default", DataTypes.BIGINT(), null, "10")));

schemaManager.applyEvolvedSchemaChange(new CreateTableEvent(CUSTOMERS, CUSTOMERS_SCHEMA));
schemaManager.applyEvolvedSchemaChange(new AddColumnEvent(CUSTOMERS, newColumns));
Expand All @@ -108,6 +111,7 @@ void testHandlingAddColumnEvent() {
.physicalColumn("append_before_phone", DataTypes.BIGINT())
.physicalColumn("phone", DataTypes.BIGINT())
.physicalColumn("append_last", DataTypes.BIGINT())
.physicalColumn("col_with_default", DataTypes.BIGINT(), null, "10")
.primaryKey("id")
.build());
}
Expand Down

0 comments on commit d638398

Please sign in to comment.