From d638398de7437cbdc9d4551c57d1ce995cad6d25 Mon Sep 17 00:00:00 2001 From: "north.lin" <37775475+qg-lin@users.noreply.github.com> Date: Thu, 26 Sep 2024 13:14:32 +0800 Subject: [PATCH] [FLINK-36375][cdc-runtime] fix missing default value in AddColumnEvent --- .../schema/coordinator/SchemaRegistryRequestHandler.java | 4 +++- .../schema/coordinator/SchemaDerivationTest.java | 8 ++++++-- .../operators/schema/coordinator/SchemaManagerTest.java | 6 +++++- 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java index 8ab8b33e07..d76ab764b7 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java @@ -391,7 +391,9 @@ private List lenientizeSchemaChangeEvent(SchemaChangeEvent ev .getType() .nullable(), col.getAddColumn() - .getComment()))) + .getComment(), + col.getAddColumn() + .getDefaultValueExpression()))) .collect(Collectors.toList()))); } case DROP_COLUMN: diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivationTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivationTest.java index 9a2d1cfb4f..061cb12783 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivationTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivationTest.java @@ -105,14 +105,18 @@ void testOneToOneMapping() { AddColumnEvent.ColumnWithPosition newCol2 = new AddColumnEvent.ColumnWithPosition( new PhysicalColumn("new_col2", DataTypes.STRING(), null)); - List newColumns = Arrays.asList(newCol1, newCol2); + AddColumnEvent.ColumnWithPosition newCol3 = + new AddColumnEvent.ColumnWithPosition( + new PhysicalColumn("new_col3", DataTypes.STRING(), null, "abc")); + List newColumns = + Arrays.asList(newCol1, newCol2, newCol3); List derivedChangesAfterAddColumn = schemaDerivation.applySchemaChange(new AddColumnEvent(TABLE_1, newColumns)); assertThat(derivedChangesAfterAddColumn).hasSize(1); assertThat(derivedChangesAfterAddColumn.get(0)) .asAddColumnEvent() .hasTableId(MERGED_TABLE) - .containsAddedColumns(newCol1, newCol2); + .containsAddedColumns(newCol1, newCol2, newCol3); // Alter column type ImmutableMap typeMapping = ImmutableMap.of("age", DataTypes.BIGINT()); diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaManagerTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaManagerTest.java index 88e1264623..5adf50dcf3 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaManagerTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaManagerTest.java @@ -94,7 +94,10 @@ void testHandlingAddColumnEvent() { new AddColumnEvent.ColumnWithPosition( Column.physicalColumn("append_before_phone", DataTypes.BIGINT()), AddColumnEvent.ColumnPosition.BEFORE, - "phone")); + "phone"), + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn( + "col_with_default", DataTypes.BIGINT(), null, "10"))); schemaManager.applyEvolvedSchemaChange(new CreateTableEvent(CUSTOMERS, CUSTOMERS_SCHEMA)); schemaManager.applyEvolvedSchemaChange(new AddColumnEvent(CUSTOMERS, newColumns)); @@ -108,6 +111,7 @@ void testHandlingAddColumnEvent() { .physicalColumn("append_before_phone", DataTypes.BIGINT()) .physicalColumn("phone", DataTypes.BIGINT()) .physicalColumn("append_last", DataTypes.BIGINT()) + .physicalColumn("col_with_default", DataTypes.BIGINT(), null, "10") .primaryKey("id") .build()); }