From 32d3f1ad0d669ba2f4fc7d771ce4ac075c5dc992 Mon Sep 17 00:00:00 2001 From: North Lin <37775475+qg-lin@users.noreply.github.com> Date: Fri, 20 Sep 2024 17:33:30 +0800 Subject: [PATCH 1/2] [minor][cdc-connector][mongodb] Add `op_type` metadata column --- .../connectors/flink-sources/mongodb-cdc.md | 5 ++ .../connectors/flink-sources/mongodb-cdc.md | 5 ++ .../table/MongoDBReadableMetadata.java | 14 ++++ .../source/MongoDBFullChangelogITCase.java | 67 ++++++++++--------- 4 files changed, 58 insertions(+), 33 deletions(-) diff --git a/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md b/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md index 83103c9218..7675805e30 100644 --- a/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md +++ b/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md @@ -332,6 +332,11 @@ upstart 流需要一个唯一的密钥,所以我们必须声明 `_id` 作为 TIMESTAMP_LTZ(3) NOT NULL 它指示在数据库中进行更改的时间。
如果记录是从表的快照而不是改变流中读取的,该值将始终为0。 + + op_type + STRING NOT NULL + 该行的操作类型。 + diff --git a/docs/content/docs/connectors/flink-sources/mongodb-cdc.md b/docs/content/docs/connectors/flink-sources/mongodb-cdc.md index 9ffbf184d4..be7e3cace6 100644 --- a/docs/content/docs/connectors/flink-sources/mongodb-cdc.md +++ b/docs/content/docs/connectors/flink-sources/mongodb-cdc.md @@ -357,6 +357,11 @@ The following format metadata can be exposed as read-only (VIRTUAL) columns in a TIMESTAMP_LTZ(3) NOT NULL It indicates the time that the change was made in the database.
If the record is read from snapshot of the table instead of the change stream, the value is always 0. + + op_type + STRING NOT NULL + Operation type of the row. + diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBReadableMetadata.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBReadableMetadata.java index c2baf021c9..c4679da6b9 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBReadableMetadata.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBReadableMetadata.java @@ -81,6 +81,20 @@ public Object read(SourceRecord record) { return TimestampData.fromEpochMillis( (Long) source.get(AbstractSourceInfo.TIMESTAMP_KEY)); } + }), + + OP_TYPE( + "op_type", + DataTypes.STRING().notNull(), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object read(SourceRecord record) { + Struct value = (Struct) record.value(); + String opType = value.getString(MongoDBEnvelope.OPERATION_TYPE_FIELD); + return StringData.fromString(opType); + } }); private final String key; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java index 8d8047fa72..796bbfbad5 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java @@ -648,6 +648,7 @@ private void testMongoDBParallelSource( + " name STRING," + " address STRING," + " phone_number STRING," + + " op_type STRING METADATA FROM 'op_type' VIRTUAL," + " primary key (_id) not enforced" + ") WITH (" + " 'connector' = 'mongodb-cdc'," @@ -674,31 +675,31 @@ private void testMongoDBParallelSource( // first step: check the snapshot data String[] snapshotForSingleTable = new String[] { - "+I[101, user_1, Shanghai, 123567891234]", - "+I[102, user_2, Shanghai, 123567891234]", - "+I[103, user_3, Shanghai, 123567891234]", - "+I[109, user_4, Shanghai, 123567891234]", - "+I[110, user_5, Shanghai, 123567891234]", - "+I[111, user_6, Shanghai, 123567891234]", - "+I[118, user_7, Shanghai, 123567891234]", - "+I[121, user_8, Shanghai, 123567891234]", - "+I[123, user_9, Shanghai, 123567891234]", - "+I[1009, user_10, Shanghai, 123567891234]", - "+I[1010, user_11, Shanghai, 123567891234]", - "+I[1011, user_12, Shanghai, 123567891234]", - "+I[1012, user_13, Shanghai, 123567891234]", - "+I[1013, user_14, Shanghai, 123567891234]", - "+I[1014, user_15, Shanghai, 123567891234]", - "+I[1015, user_16, Shanghai, 123567891234]", - "+I[1016, user_17, Shanghai, 123567891234]", - "+I[1017, user_18, Shanghai, 123567891234]", - "+I[1018, user_19, Shanghai, 123567891234]", - "+I[1019, user_20, Shanghai, 123567891234]", - "+I[2000, user_21, Shanghai, 123567891234]" + "+I[101, user_1, Shanghai, 123567891234, insert]", + "+I[102, user_2, Shanghai, 123567891234, insert]", + "+I[103, user_3, Shanghai, 123567891234, insert]", + "+I[109, user_4, Shanghai, 123567891234, insert]", + "+I[110, user_5, Shanghai, 123567891234, insert]", + "+I[111, user_6, Shanghai, 123567891234, insert]", + "+I[118, user_7, Shanghai, 123567891234, insert]", + "+I[121, user_8, Shanghai, 123567891234, insert]", + "+I[123, user_9, Shanghai, 123567891234, insert]", + "+I[1009, user_10, Shanghai, 123567891234, insert]", + "+I[1010, user_11, Shanghai, 123567891234, insert]", + "+I[1011, user_12, Shanghai, 123567891234, insert]", + "+I[1012, user_13, Shanghai, 123567891234, insert]", + "+I[1013, user_14, Shanghai, 123567891234, insert]", + "+I[1014, user_15, Shanghai, 123567891234, insert]", + "+I[1015, user_16, Shanghai, 123567891234, insert]", + "+I[1016, user_17, Shanghai, 123567891234, insert]", + "+I[1017, user_18, Shanghai, 123567891234, insert]", + "+I[1018, user_19, Shanghai, 123567891234, insert]", + "+I[1019, user_20, Shanghai, 123567891234, insert]", + "+I[2000, user_21, Shanghai, 123567891234, insert]" }; tEnv.executeSql(sourceDDL); TableResult tableResult = - tEnv.executeSql("select cid, name, address, phone_number from customers"); + tEnv.executeSql("select cid, name, address, phone_number, op_type from customers"); CloseableIterator iterator = tableResult.collect(); JobID jobId = tableResult.getJobClient().get().getJobID(); List expectedSnapshotData = new ArrayList<>(); @@ -731,17 +732,17 @@ private void testMongoDBParallelSource( String[] changeEventsForSingleTable = new String[] { - "-U[101, user_1, Shanghai, 123567891234]", - "+U[101, user_1, Hangzhou, 123567891234]", - "-D[102, user_2, Shanghai, 123567891234]", - "+I[102, user_2, Shanghai, 123567891234]", - "-U[103, user_3, Shanghai, 123567891234]", - "+U[103, user_3, Hangzhou, 123567891234]", - "-U[1010, user_11, Shanghai, 123567891234]", - "+U[1010, user_11, Hangzhou, 123567891234]", - "+I[2001, user_22, Shanghai, 123567891234]", - "+I[2002, user_23, Shanghai, 123567891234]", - "+I[2003, user_24, Shanghai, 123567891234]" + "-U[101, user_1, Shanghai, 123567891234, update]", + "+U[101, user_1, Hangzhou, 123567891234, update]", + "-D[102, user_2, Shanghai, 123567891234, delete]", + "+I[102, user_2, Shanghai, 123567891234, insert]", + "-U[103, user_3, Shanghai, 123567891234, update]", + "+U[103, user_3, Hangzhou, 123567891234, update]", + "-U[1010, user_11, Shanghai, 123567891234, update]", + "+U[1010, user_11, Hangzhou, 123567891234, update]", + "+I[2001, user_22, Shanghai, 123567891234, insert]", + "+I[2002, user_23, Shanghai, 123567891234, insert]", + "+I[2003, user_24, Shanghai, 123567891234, insert]" }; List expectedChangeStreamData = new ArrayList<>(); for (int i = 0; i < captureCustomerCollections.length; i++) { From 19e38b0802530f9b8bc9b4eae8612ce17439391e Mon Sep 17 00:00:00 2001 From: "north.lin" <37775475+qg-lin@users.noreply.github.com> Date: Thu, 26 Sep 2024 12:31:49 +0800 Subject: [PATCH 2/2] update doc and case --- .../connectors/flink-sources/mongodb-cdc.md | 3 +- .../connectors/flink-sources/mongodb-cdc.md | 3 +- .../mongodb/table/MongoDBConnectorITCase.java | 36 ++++++++++--------- 3 files changed, 24 insertions(+), 18 deletions(-) diff --git a/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md b/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md index 7675805e30..3bb21adc86 100644 --- a/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md +++ b/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md @@ -335,7 +335,7 @@ upstart 流需要一个唯一的密钥,所以我们必须声明 `_id` 作为 op_type STRING NOT NULL - 该行的操作类型。 + 该行的操作类型,值为:insert、update、replace、delete。
由于DeduplicateFunctionHelper#processLastRowOnChangelog返回删除前的行,delete类型暂时缺失。 @@ -346,6 +346,7 @@ CREATE TABLE products ( db_name STRING METADATA FROM 'database_name' VIRTUAL, collection_name STRING METADATA FROM 'collection_name' VIRTUAL, operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, + operation_type STRING METADATA FROM 'op_type' VIRTUAL, _id STRING, // 必须声明 name STRING, weight DECIMAL(10,3), diff --git a/docs/content/docs/connectors/flink-sources/mongodb-cdc.md b/docs/content/docs/connectors/flink-sources/mongodb-cdc.md index be7e3cace6..138e186156 100644 --- a/docs/content/docs/connectors/flink-sources/mongodb-cdc.md +++ b/docs/content/docs/connectors/flink-sources/mongodb-cdc.md @@ -360,7 +360,7 @@ The following format metadata can be exposed as read-only (VIRTUAL) columns in a op_type STRING NOT NULL - Operation type of the row. + Operation type of the row, the value is one of: insert,update,replace,delete.
Since DeduplicateFunctionHelper#processLastRowOnChangelog returns the row before deletion, the `delete` type is temporarily missing. @@ -371,6 +371,7 @@ CREATE TABLE products ( db_name STRING METADATA FROM 'database_name' VIRTUAL, collection_name STRING METADATA FROM 'collection_name' VIRTUAL, operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, + operation_type STRING METADATA FROM 'op_type' VIRTUAL, _id STRING, // must be declared name STRING, weight DECIMAL(10,3), diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBConnectorITCase.java index 19d47856e1..e05b026648 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBConnectorITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBConnectorITCase.java @@ -482,6 +482,7 @@ public void testMetadataColumns() throws Exception { + " weight DECIMAL(10,3)," + " db_name STRING METADATA FROM 'database_name' VIRTUAL," + " collection_name STRING METADATA VIRTUAL," + + " op_type STRING METADATA VIRTUAL," + " PRIMARY KEY (_id) NOT ENFORCED" + ") WITH (" + " 'connector' = 'mongodb-cdc'," @@ -508,6 +509,7 @@ public void testMetadataColumns() throws Exception { + " weight DECIMAL(10,3)," + " database_name STRING," + " collection_name STRING," + + " op_type STRING," + " PRIMARY KEY (_id) NOT ENFORCED" + ") WITH (" + " 'connector' = 'values'," @@ -562,22 +564,24 @@ public void testMetadataColumns() throws Exception { List expected = Stream.of( - "+I(100000000000000000000101,scooter,Small 2-wheel scooter,3.140,%s,products)", - "+I(100000000000000000000102,car battery,12V car battery,8.100,%s,products)", - "+I(100000000000000000000103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.800,%s,products)", - "+I(100000000000000000000104,hammer,12oz carpenter''s hammer,0.750,%s,products)", - "+I(100000000000000000000105,hammer,12oz carpenter''s hammer,0.875,%s,products)", - "+I(100000000000000000000106,hammer,12oz carpenter''s hammer,1.000,%s,products)", - "+I(100000000000000000000107,rocks,box of assorted rocks,5.300,%s,products)", - "+I(100000000000000000000108,jacket,water resistent black wind breaker,0.100,%s,products)", - "+I(100000000000000000000109,spare tire,24 inch spare tire,22.200,%s,products)", - "+I(100000000000000000000110,jacket,water resistent white wind breaker,0.200,%s,products)", - "+I(100000000000000000000111,scooter,Big 2-wheel scooter,5.180,%s,products)", - "+U(100000000000000000000106,hammer,18oz carpenter hammer,1.000,%s,products)", - "+U(100000000000000000000107,rocks,box of assorted rocks,5.100,%s,products)", - "+U(100000000000000000000110,jacket,new water resistent white wind breaker,0.500,%s,products)", - "+U(100000000000000000000111,scooter,Big 2-wheel scooter,5.170,%s,products)", - "-D(100000000000000000000111,scooter,Big 2-wheel scooter,5.170,%s,products)") + "+I(100000000000000000000101,scooter,Small 2-wheel scooter,3.140,%s,products,insert)", + "+I(100000000000000000000102,car battery,12V car battery,8.100,%s,products,insert)", + "+I(100000000000000000000103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.800,%s,products,insert)", + "+I(100000000000000000000104,hammer,12oz carpenter''s hammer,0.750,%s,products,insert)", + "+I(100000000000000000000105,hammer,12oz carpenter''s hammer,0.875,%s,products,insert)", + "+I(100000000000000000000106,hammer,12oz carpenter''s hammer,1.000,%s,products,insert)", + "+I(100000000000000000000107,rocks,box of assorted rocks,5.300,%s,products,insert)", + "+I(100000000000000000000108,jacket,water resistent black wind breaker,0.100,%s,products,insert)", + "+I(100000000000000000000109,spare tire,24 inch spare tire,22.200,%s,products,insert)", + "+I(100000000000000000000110,jacket,water resistent white wind breaker,0.200,%s,products,insert)", + "+I(100000000000000000000111,scooter,Big 2-wheel scooter,5.180,%s,products,insert)", + "+U(100000000000000000000106,hammer,18oz carpenter hammer,1.000,%s,products,update)", + "+U(100000000000000000000107,rocks,box of assorted rocks,5.100,%s,products,update)", + "+U(100000000000000000000110,jacket,new water resistent white wind breaker,0.500,%s,products,update)", + "+U(100000000000000000000111,scooter,Big 2-wheel scooter,5.170,%s,products,update)", + // NOTE: DeduplicateFunctionHelper#processLastRowOnChangelog returns + // the preRow before deletion, so the `op_type` is 'update' here. + "-D(100000000000000000000111,scooter,Big 2-wheel scooter,5.170,%s,products,update)") .map(s -> String.format(s, database)) .sorted() .collect(Collectors.toList());