diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java index 8ab8b33e07..d76ab764b7 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java @@ -391,7 +391,9 @@ private List lenientizeSchemaChangeEvent(SchemaChangeEvent ev .getType() .nullable(), col.getAddColumn() - .getComment()))) + .getComment(), + col.getAddColumn() + .getDefaultValueExpression()))) .collect(Collectors.toList()))); } case DROP_COLUMN: diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivationTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivationTest.java index 9a2d1cfb4f..061cb12783 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivationTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivationTest.java @@ -105,14 +105,18 @@ void testOneToOneMapping() { AddColumnEvent.ColumnWithPosition newCol2 = new AddColumnEvent.ColumnWithPosition( new PhysicalColumn("new_col2", DataTypes.STRING(), null)); - List newColumns = Arrays.asList(newCol1, newCol2); + AddColumnEvent.ColumnWithPosition newCol3 = + new AddColumnEvent.ColumnWithPosition( + new PhysicalColumn("new_col3", DataTypes.STRING(), null, "abc")); + List newColumns = + Arrays.asList(newCol1, newCol2, newCol3); List derivedChangesAfterAddColumn = schemaDerivation.applySchemaChange(new AddColumnEvent(TABLE_1, newColumns)); assertThat(derivedChangesAfterAddColumn).hasSize(1); assertThat(derivedChangesAfterAddColumn.get(0)) .asAddColumnEvent() .hasTableId(MERGED_TABLE) - .containsAddedColumns(newCol1, newCol2); + .containsAddedColumns(newCol1, newCol2, newCol3); // Alter column type ImmutableMap typeMapping = ImmutableMap.of("age", DataTypes.BIGINT()); diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaManagerTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaManagerTest.java index 88e1264623..5adf50dcf3 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaManagerTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaManagerTest.java @@ -94,7 +94,10 @@ void testHandlingAddColumnEvent() { new AddColumnEvent.ColumnWithPosition( Column.physicalColumn("append_before_phone", DataTypes.BIGINT()), AddColumnEvent.ColumnPosition.BEFORE, - "phone")); + "phone"), + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn( + "col_with_default", DataTypes.BIGINT(), null, "10"))); schemaManager.applyEvolvedSchemaChange(new CreateTableEvent(CUSTOMERS, CUSTOMERS_SCHEMA)); schemaManager.applyEvolvedSchemaChange(new AddColumnEvent(CUSTOMERS, newColumns)); @@ -108,6 +111,7 @@ void testHandlingAddColumnEvent() { .physicalColumn("append_before_phone", DataTypes.BIGINT()) .physicalColumn("phone", DataTypes.BIGINT()) .physicalColumn("append_last", DataTypes.BIGINT()) + .physicalColumn("col_with_default", DataTypes.BIGINT(), null, "10") .primaryKey("id") .build()); }