Skip to content

Commit

Permalink
[Flink] Support MongoDB CDC Import/Export (lakesoul-io#460)
Browse files Browse the repository at this point in the history
* support mongodb cdc

Signed-off-by: ChenYunHey <[email protected]>

* support mongodb sync

Signed-off-by: ChenYunHey <[email protected]>

* fix some bugs

Signed-off-by: ChenYunHey <[email protected]>

* lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkGlobalCommitter.java

Signed-off-by: ChenYunHey <[email protected]>

* fix some bugs

Signed-off-by: ChenYunHey <[email protected]>

* add mongodb cdc docs

Signed-off-by: ChenYunHey <[email protected]>

* Add some documentation

Signed-off-by: ChenYunHey <[email protected]>

* format the code

Signed-off-by: ChenYunHey <[email protected]>

* Fix field attribute issues

Signed-off-by: ChenYunHey <[email protected]>

* test

Signed-off-by: ChenYunHey <[email protected]>

* test

Signed-off-by: ChenYunHey <[email protected]>

* fix data delete event

Signed-off-by: ChenYunHey <[email protected]>

* Remove irrelevant code

Signed-off-by: ChenYunHey <[email protected]>

---------

Signed-off-by: ChenYunHey <[email protected]>
  • Loading branch information
ChenYunHey authored Apr 1, 2024
1 parent 87dd675 commit e4fa2bc
Show file tree
Hide file tree
Showing 17 changed files with 989 additions and 176 deletions.
32 changes: 27 additions & 5 deletions lakesoul-flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,17 @@ SPDX-License-Identifier: Apache-2.0
<scala.binary.version>2.12</scala.binary.version>
<scala.version>2.12.10</scala.version>
<log4j.version>2.17.2</log4j.version>
<cdc.version>2.4.2</cdc.version>
<cdc.version>3.0.0</cdc.version>
<local.scope>provided</local.scope>
</properties>

<dependencies>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest</artifactId>
<version>2.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.dmetasoul</groupId>
<artifactId>lakesoul-common</artifactId>
Expand Down Expand Up @@ -122,26 +128,42 @@ SPDX-License-Identifier: Apache-2.0
<scope>test</scope>
<type>test-jar</type>
</dependency>

<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-postgres-cdc</artifactId>
<artifactId>flink-sql-connector-sqlserver-cdc</artifactId>
<version>${cdc.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-sqlserver-cdc</artifactId>
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
<version>${cdc.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
<artifactId>flink-sql-connector-oracle-cdc</artifactId>
<version>${cdc.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-oracle-cdc</artifactId>
<artifactId>flink-sql-connector-postgres-cdc</artifactId>
<version>${cdc.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-mongodb-cdc</artifactId>
<version>${cdc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-mongodb</artifactId>
<version>1.0.1-1.17</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>bson</artifactId>
<version>4.3.4</version>
</dependency>
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>flink-doris-connector-1.17</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
import com.ververica.cdc.connectors.oracle.source.OracleSourceBuilder;
import com.ververica.cdc.connectors.postgres.source.PostgresSourceBuilder;
import com.ververica.cdc.connectors.mongodb.source.MongoDBSource;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import com.ververica.cdc.connectors.sqlserver.source.SqlServerSourceBuilder;
Expand Down Expand Up @@ -52,6 +54,7 @@ public class JdbcCDC {
private static String[] tableList;
private static String serverTimezone;
private static String pluginName;
private static int batchSize;

public static void main(String[] args) throws Exception {
ParameterTool parameter = ParameterTool.fromArgs(args);
Expand All @@ -62,7 +65,7 @@ public static void main(String[] args) throws Exception {
host = parameter.get(SOURCE_DB_HOST.key());
port = parameter.getInt(SOURCE_DB_PORT.key(), MysqlDBManager.DEFAULT_MYSQL_PORT);
//Postgres Oracle
if (dbType.equalsIgnoreCase("oracle") || dbType.equalsIgnoreCase("postgres")) {
if (dbType.equalsIgnoreCase("oracle") || dbType.equalsIgnoreCase("postgres") ) {
schemaList = parameter.get(SOURCE_DB_SCHEMA_LIST.key()).split(",");
String[] tables = parameter.get(SOURCE_DB_SCHEMA_TABLES.key()).split(",");
tableList = new String[tables.length];
Expand All @@ -71,7 +74,11 @@ public static void main(String[] args) throws Exception {
}
splitSize = parameter.getInt(SOURCE_DB_SPLIT_SIZE.key(), SOURCE_DB_SPLIT_SIZE.defaultValue());
}
if (dbType.equalsIgnoreCase("sqlserver")){
if (dbType.equalsIgnoreCase("sqlserver") ){
tableList = parameter.get(SOURCE_DB_SCHEMA_TABLES.key()).split(",");
}
if ( dbType.equalsIgnoreCase("mongodb")){
batchSize = parameter.getInt(BATCH_SIZE.key(), BATCH_SIZE.defaultValue());
tableList = parameter.get(SOURCE_DB_SCHEMA_TABLES.key()).split(",");
}
pluginName = parameter.get(PLUGIN_NAME.key(), PLUGIN_NAME.defaultValue());
Expand All @@ -92,6 +99,7 @@ public static void main(String[] args) throws Exception {
conf.set(SOURCE_DB_PORT, port);
conf.set(WAREHOUSE_PATH, databasePrefixPath);
conf.set(SERVER_TIME_ZONE, serverTimezone);
conf.set(SOURCE_DB_TYPE,dbType);

// parameters for mutil tables dml sink
conf.set(LakeSoulSinkOptions.USE_CDC, true);
Expand Down Expand Up @@ -136,9 +144,12 @@ public static void main(String[] args) throws Exception {
if (dbType.equalsIgnoreCase("oracle")) {
oracleCdc(lakeSoulRecordConvert, conf, env);
}
if (dbType.equalsIgnoreCase("sqlserver")){
if (dbType.equalsIgnoreCase("sqlserver")) {
sqlserverCdc(lakeSoulRecordConvert, conf, env);
}
if (dbType.equalsIgnoreCase("mongodb")) {
mongoCdc(lakeSoulRecordConvert, conf, env);
}

}

Expand Down Expand Up @@ -269,11 +280,39 @@ public static void sqlserverCdc(LakeSoulRecordConvert lakeSoulRecordConvert, Con
LakeSoulMultiTableSinkStreamBuilder
builder =
new LakeSoulMultiTableSinkStreamBuilder(sqlServerSource, context, lakeSoulRecordConvert);

DataStreamSource<BinarySourceRecord> source = builder.buildMultiTableSource("Sqlserver Source");

DataStream<BinarySourceRecord> stream = builder.buildHashPartitionedCDCStream(source);
DataStreamSink<BinarySourceRecord> dmlSink = builder.buildLakeSoulDMLSink(stream);
env.execute("LakeSoul CDC Sink From sqlserver Database " + dbName);
}

private static void mongoCdc(LakeSoulRecordConvert lakeSoulRecordConvert, Configuration conf, StreamExecutionEnvironment env) throws Exception {
MongoDBSource<BinarySourceRecord> mongoSource =
MongoDBSource.<BinarySourceRecord>builder()
.hosts(host)
.databaseList(dbName)
.collectionList(tableList)
.startupOptions(StartupOptions.initial())
.scanFullChangelog(true)
.batchSize(batchSize)
.username(userName)
.password(passWord)
.deserializer(new BinaryDebeziumDeserializationSchema(lakeSoulRecordConvert, conf.getString(WAREHOUSE_PATH)))
.build();
NameSpaceManager manager = new NameSpaceManager();
manager.importOrSyncLakeSoulNamespace(dbName);
LakeSoulMultiTableSinkStreamBuilder.Context context = new LakeSoulMultiTableSinkStreamBuilder.Context();
context.env = env;
context.conf = conf;
LakeSoulMultiTableSinkStreamBuilder
builder =
new LakeSoulMultiTableSinkStreamBuilder(mongoSource, context, lakeSoulRecordConvert);
DataStreamSource<BinarySourceRecord> source = builder.buildMultiTableSource("mongodb Source");

DataStream<BinarySourceRecord> stream = builder.buildHashPartitionedCDCStream(source);
DataStreamSink<BinarySourceRecord> dmlSink = builder.buildLakeSoulDMLSink(stream);
env.execute("LakeSoul CDC Sink From mongo Database " + dbName);

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
// SPDX-FileCopyrightText: 2023 LakeSoul Contributors
//
// SPDX-License-Identifier: Apache-2.0

package org.apache.flink.lakesoul.entry;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.InsertOneModel;
import com.mongodb.client.model.WriteModel;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.mongodb.sink.writer.context.MongoSinkContext;
import org.apache.flink.connector.mongodb.sink.writer.serializer.MongoSerializationSchema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.BinaryType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
import org.bson.*;
import org.bson.types.Decimal128;

import java.io.Serializable;
import java.math.BigDecimal;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;

public class MongoSinkUtils {

static Table coll;
static List<String> structNameFiledList;

public static void createMongoColl(String database, String collName, String uri) {
MongoClient mongoClient = MongoClients.create(uri);
MongoDatabase mongoDatabase = mongoClient.getDatabase(database);
mongoDatabase.createCollection(collName);
mongoClient.close();
}

public static class MyMongoSerializationSchema implements MongoSerializationSchema<Tuple2<Boolean, Row>>, Serializable {
@Override
public WriteModel<BsonDocument> serialize(Tuple2<Boolean, Row> record, MongoSinkContext context) {
Row row = record.f1; // Extract the Row object from the Tuple2
BsonDocument document = new BsonDocument();
int fieldCount = row.getArity();
DataType[] fieldDataTypes = coll.getSchema().getFieldDataTypes();
for (int i = 0; i < fieldCount; i++) {
String fieldName = coll.getSchema().getFieldNames()[i];
Object fieldValue = row.getField(i);
if (fieldValue instanceof Row) {
DataType dataType = fieldDataTypes[i];
RowType rowType = (RowType) dataType.getLogicalType();
structNameFiledList = traverseRow(rowType);
}
if (fieldValue != null) {
try {
document.append(fieldName, convertTonBsonValue(fieldValue));
} catch (ParseException e) {
throw new RuntimeException(e);
}
}
}
return new InsertOneModel<>(document);
}


public static BsonValue convertTonBsonValue(Object value) throws ParseException {
if (value == null) {
return new BsonNull();
} else if (value instanceof Integer) {
return new BsonInt32((Integer) value);
} else if (value instanceof Long) {
return new BsonInt64((Long) value);
} else if (value instanceof String) {
return new BsonString((String) value);
} else if (value instanceof Boolean) {
return new BsonBoolean((Boolean) value);
} else if (value instanceof Double) {
return new BsonDouble((Double) value);
} else if (value instanceof BigDecimal) {
return new BsonDecimal128(new Decimal128((BigDecimal) value));
} else if (value instanceof Date) {
return new BsonDateTime((long) value);
} else if (value instanceof BinaryType) {
return new BsonBinary((byte[]) value);
} else if (value instanceof byte[]) {
return new BsonBinary((byte[]) value);
} else if (value instanceof Object[]) {
Object[] array = (Object[]) value;
BsonArray bsonArray = new BsonArray();
for (Object element : array) {
bsonArray.add(convertTonBsonValue(element));
}
return bsonArray;
} else if (isDateTimeString(value)) {
Date date = parseDateTime(value.toString());
return new BsonDateTime(date.getTime());
} else if (value instanceof Row) {
Row row = (Row) value;
BsonDocument bsonDocument = new BsonDocument();
for (int i = 0; i < row.getArity(); i++) {
Object fieldValue = row.getField(i);
List<String> stringList = new ArrayList<>(structNameFiledList);
String name = structNameFiledList.get(0);
stringList.remove(0);
structNameFiledList = stringList;
bsonDocument.append(name, convertTonBsonValue(fieldValue));
}
return bsonDocument;
} else {
throw new IllegalArgumentException("Unsupported data type: " + value.getClass());
}
}

public static List<String> traverseRow(RowType rowType) {
List<String> nameList = new ArrayList<>();
traverseField(rowType, nameList);
return nameList;
}

private static void traverseField(RowType rowType, List<String> nameList) {
for (int i = 0; i < rowType.getFieldCount(); i++) {
String fieldName = rowType.getFieldNames().get(i);
LogicalType fieldType = rowType.getTypeAt(i);
nameList.add(fieldName);
if (fieldType instanceof RowType) {
traverseField((RowType) fieldType, nameList);
}
}
}

public static List<String> findDirectNestedNames(JSONObject jsonObject, String targetFieldName, int currentLevel, int targetLevel) {
List<String> nestedNames = new ArrayList<>();
findDirectNestedNamesHelper(jsonObject, targetFieldName, currentLevel, targetLevel, nestedNames);
return nestedNames;
}

public static void findDirectNestedNamesHelper(JSONObject jsonObject, String targetFieldName, int currentLevel, int targetLevel, List<String> nestedNames) {
if (currentLevel == targetLevel) {
if (jsonObject.getString("name").equals(targetFieldName)) {
JSONArray children = jsonObject.getJSONArray("children");
for (Object obj : children) {
JSONObject child = (JSONObject) obj;
nestedNames.add(child.getString("name"));
}
}
} else {
JSONArray children = jsonObject.getJSONArray("children");
for (Object obj : children) {
JSONObject child = (JSONObject) obj;
findDirectNestedNamesHelper(child, targetFieldName, currentLevel + 1, targetLevel, nestedNames);
}
}
}

public static boolean isDateTimeString(Object value) {
return value.toString().matches("^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}Z$");
}

public static Date parseDateTime(String value) throws ParseException {
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
return dateFormat.parse(value);
}
}
}
Loading

0 comments on commit e4fa2bc

Please sign in to comment.