From cf8794675d0d70405a77b9d7c2508a3f97058eff Mon Sep 17 00:00:00 2001 From: ChenYunHey <90120383+ChenYunHey@users.noreply.github.com> Date: Tue, 10 Sep 2024 15:12:19 +0800 Subject: [PATCH] [Flink] Support other options for external database export (#539) * support other options start with -D,add checkpoint path,interval when xsync external db Signed-off-by: ChenYunHey <1908166778@qq.com> * add stream load options when create table Signed-off-by: ChenYunHey <1908166778@qq.com> --------- Signed-off-by: ChenYunHey <1908166778@qq.com> --- .../flink/lakesoul/entry/SyncDatabase.java | 64 +++++++++++++------ 1 file changed, 45 insertions(+), 19 deletions(-) diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/SyncDatabase.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/SyncDatabase.java index a8c2dc9ef..37f5f2302 100644 --- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/SyncDatabase.java +++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/SyncDatabase.java @@ -30,6 +30,8 @@ import java.util.*; import static org.apache.flink.lakesoul.entry.MongoSinkUtils.*; +import static org.apache.flink.lakesoul.tool.JobOptions.FLINK_CHECKPOINT; +import static org.apache.flink.lakesoul.tool.JobOptions.JOB_CHECKPOINT_INTERVAL; import static org.apache.flink.lakesoul.tool.LakeSoulSinkDatabasesOptions.*; public class SyncDatabase { @@ -45,6 +47,8 @@ public class SyncDatabase { static boolean useBatch; static int sinkParallelism; static String jdbcOrDorisOptions; + static int checkpointInterval; + static String checkpointPath; public static void main(String[] args) throws Exception { StringBuilder connectorOptions = new StringBuilder(); @@ -55,11 +59,13 @@ public static void main(String[] args) throws Exception { targetDatabase = parameter.get(TARGET_DB_DB_NAME.key()); targetTableName = parameter.get(TARGET_DB_TABLE_NAME.key()).toLowerCase(); url = parameter.get(TARGET_DB_URL.key()); + checkpointInterval = parameter.getInt(JOB_CHECKPOINT_INTERVAL.key(), JOB_CHECKPOINT_INTERVAL.defaultValue()); + checkpointPath = parameter.get(FLINK_CHECKPOINT.key()); if (dbType.equals("mysql") || dbType.equals("postgresql") || dbType.equals("doris")){ for (int i = 0; i < args.length; i++) { - if ( args[i].startsWith("--jdbc") || args[i].startsWith("--doris")){ + if ( args[i].startsWith("--D")){ connectorOptions.append("'") - .append(args[i].substring(7)) + .append(args[i].substring(3)) .append("'") .append("=") .append("'") @@ -78,12 +84,11 @@ public static void main(String[] args) throws Exception { } sinkParallelism = parameter.getInt(SINK_PARALLELISM.key(), SINK_PARALLELISM.defaultValue()); useBatch = parameter.getBoolean(BATHC_STREAM_SINK.key(), BATHC_STREAM_SINK.defaultValue()); - - String fenodes = parameter.get(DORIS_FENODES.key(), DORIS_FENODES.defaultValue()); Configuration conf = new Configuration(); conf.setString(RestOptions.BIND_PORT, "8081-8089"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(sinkParallelism); + env.getCheckpointConfig().setCheckpointStorage(checkpointPath); switch (dbType) { case "mysql": @@ -93,6 +98,7 @@ public static void main(String[] args) throws Exception { xsyncToPg(env); break; case "doris": + String fenodes = parameter.get(DORIS_FENODES.key(), DORIS_FENODES.defaultValue()); xsyncToDoris(env, fenodes); break; case "mongodb": @@ -187,7 +193,7 @@ public static String[] getDorisFieldTypes(DataType[] fieldTypes) { String[] stringFieldTypes = new String[fieldTypes.length]; for (int i = 0; i < fieldTypes.length; i++) { if (fieldTypes[i].getLogicalType() instanceof TimestampType) { - stringFieldTypes[i] = "DATETIME"; + stringFieldTypes[i] = "TIMESTAMP"; } else if (fieldTypes[i].getLogicalType() instanceof VarCharType) { stringFieldTypes[i] = "VARCHAR"; } else if (fieldTypes[i].getLogicalType() instanceof LocalZonedTimestampType ) { @@ -221,7 +227,7 @@ public static void xsyncToPg(StreamExecutionEnvironment env) throws SQLException env.setRuntimeMode(RuntimeExecutionMode.BATCH); } else { env.setRuntimeMode(RuntimeExecutionMode.STREAMING); - env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE); + env.enableCheckpointing(checkpointInterval, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); } StreamTableEnvironment tEnvs = StreamTableEnvironment.create(env); @@ -241,14 +247,19 @@ public static void xsyncToPg(StreamExecutionEnvironment env) throws SQLException statement.executeUpdate(createTableSql.toString()); StringBuilder coulmns = new StringBuilder(); for (int i = 0; i < fieldDataTypes.length; i++) { - if (stringFieldsTypes[i].equals("BYTEA")) { - coulmns.append("`").append(fieldNames[i]).append("` ").append("BYTES"); - } else if (stringFieldsTypes[i].equals("TEXT")) { - coulmns.append("`").append(fieldNames[i]).append("` ").append("VARCHAR"); - } else if (stringFieldsTypes[i].equals("FLOAT8")) { - coulmns.append("`").append(fieldNames[i]).append("`").append("DOUBLE"); - } else { - coulmns.append("`").append(fieldNames[i]).append("` ").append(stringFieldsTypes[i]); + switch (stringFieldsTypes[i]) { + case "BYTEA": + coulmns.append("`").append(fieldNames[i]).append("` ").append("BYTES"); + break; + case "TEXT": + coulmns.append("`").append(fieldNames[i]).append("` ").append("VARCHAR"); + break; + case "FLOAT8": + coulmns.append("`").append(fieldNames[i]).append("`").append("DOUBLE"); + break; + default: + coulmns.append("`").append(fieldNames[i]).append("` ").append(stringFieldsTypes[i]); + break; } if (i < fieldDataTypes.length - 1) { coulmns.append(","); @@ -286,7 +297,7 @@ public static void xsyncToMysql(StreamExecutionEnvironment env) throws SQLExcept env.setRuntimeMode(RuntimeExecutionMode.BATCH); } else { env.setRuntimeMode(RuntimeExecutionMode.STREAMING); - env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE); + env.enableCheckpointing(checkpointInterval, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); } StreamTableEnvironment tEnvs = StreamTableEnvironment.create(env); @@ -351,7 +362,7 @@ public static void xsyncToDoris(StreamExecutionEnvironment env, String fenodes) env.setRuntimeMode(RuntimeExecutionMode.BATCH); } else { env.setRuntimeMode(RuntimeExecutionMode.STREAMING); - env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE); + env.enableCheckpointing(checkpointInterval, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); } StreamTableEnvironment tEnvs = StreamTableEnvironment.create(env); @@ -372,11 +383,26 @@ public static void xsyncToDoris(StreamExecutionEnvironment env, String fenodes) String sql; if (jdbcOrDorisOptions == null){ sql = String.format( - "create table %s(%s) with ('connector' = '%s', 'jdbc-url' = '%s', 'fenodes' = '%s', 'table.identifier' = '%s', 'username' = '%s', 'password' = '%s')", + "create table %s(%s) with ('connector' = '%s'," + + " 'jdbc-url' = '%s'," + + " 'fenodes' = '%s'," + + " 'table.identifier' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'sink.properties.format' = 'json'," + + " 'sink.properties.read_json_by_line' = 'true')", targetTableName, coulmns, "doris", jdbcUrl, fenodes, targetDatabase + "." + targetTableName, username, password); }else { sql = String.format( - "create table %s(%s) with ('connector' = '%s', 'jdbc-url' = '%s', 'fenodes' = '%s', 'table.identifier' = '%s', 'username' = '%s', 'password' = '%s', %s)", + "create table %s(%s) with ('connector' = '%s'," + + " 'jdbc-url' = '%s'," + + " 'fenodes' = '%s'," + + " 'table.identifier' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'sink.properties.format' = 'json'," + + " 'sink.properties.read_json_by_line' = 'true'," + + " %s)", targetTableName, coulmns, "doris", jdbcUrl, fenodes, targetDatabase + "." + targetTableName, username, password, jdbcOrDorisOptions); } @@ -393,7 +419,7 @@ public static void xsyncToMongodb(StreamExecutionEnvironment env, env.setRuntimeMode(RuntimeExecutionMode.BATCH); } else { env.setRuntimeMode(RuntimeExecutionMode.STREAMING); - env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE); + env.enableCheckpointing(checkpointInterval, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); } StreamTableEnvironment tEnvs = StreamTableEnvironment.create(env);