Skip to content

Commit

Permalink
[Flink] Support other options for external database export (lakesoul-…
Browse files Browse the repository at this point in the history
…io#539)

* support other options start with -D,add checkpoint path,interval when xsync external db

Signed-off-by: ChenYunHey <[email protected]>

* add stream load options when create table

Signed-off-by: ChenYunHey <[email protected]>

---------

Signed-off-by: ChenYunHey <[email protected]>
  • Loading branch information
ChenYunHey authored Sep 10, 2024
1 parent 0566a1f commit cf87946
Showing 1 changed file with 45 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import java.util.*;

import static org.apache.flink.lakesoul.entry.MongoSinkUtils.*;
import static org.apache.flink.lakesoul.tool.JobOptions.FLINK_CHECKPOINT;
import static org.apache.flink.lakesoul.tool.JobOptions.JOB_CHECKPOINT_INTERVAL;
import static org.apache.flink.lakesoul.tool.LakeSoulSinkDatabasesOptions.*;

public class SyncDatabase {
Expand All @@ -45,6 +47,8 @@ public class SyncDatabase {
static boolean useBatch;
static int sinkParallelism;
static String jdbcOrDorisOptions;
static int checkpointInterval;
static String checkpointPath;

public static void main(String[] args) throws Exception {
StringBuilder connectorOptions = new StringBuilder();
Expand All @@ -55,11 +59,13 @@ public static void main(String[] args) throws Exception {
targetDatabase = parameter.get(TARGET_DB_DB_NAME.key());
targetTableName = parameter.get(TARGET_DB_TABLE_NAME.key()).toLowerCase();
url = parameter.get(TARGET_DB_URL.key());
checkpointInterval = parameter.getInt(JOB_CHECKPOINT_INTERVAL.key(), JOB_CHECKPOINT_INTERVAL.defaultValue());
checkpointPath = parameter.get(FLINK_CHECKPOINT.key());
if (dbType.equals("mysql") || dbType.equals("postgresql") || dbType.equals("doris")){
for (int i = 0; i < args.length; i++) {
if ( args[i].startsWith("--jdbc") || args[i].startsWith("--doris")){
if ( args[i].startsWith("--D")){
connectorOptions.append("'")
.append(args[i].substring(7))
.append(args[i].substring(3))
.append("'")
.append("=")
.append("'")
Expand All @@ -78,12 +84,11 @@ public static void main(String[] args) throws Exception {
}
sinkParallelism = parameter.getInt(SINK_PARALLELISM.key(), SINK_PARALLELISM.defaultValue());
useBatch = parameter.getBoolean(BATHC_STREAM_SINK.key(), BATHC_STREAM_SINK.defaultValue());

String fenodes = parameter.get(DORIS_FENODES.key(), DORIS_FENODES.defaultValue());
Configuration conf = new Configuration();
conf.setString(RestOptions.BIND_PORT, "8081-8089");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(sinkParallelism);
env.getCheckpointConfig().setCheckpointStorage(checkpointPath);

switch (dbType) {
case "mysql":
Expand All @@ -93,6 +98,7 @@ public static void main(String[] args) throws Exception {
xsyncToPg(env);
break;
case "doris":
String fenodes = parameter.get(DORIS_FENODES.key(), DORIS_FENODES.defaultValue());
xsyncToDoris(env, fenodes);
break;
case "mongodb":
Expand Down Expand Up @@ -187,7 +193,7 @@ public static String[] getDorisFieldTypes(DataType[] fieldTypes) {
String[] stringFieldTypes = new String[fieldTypes.length];
for (int i = 0; i < fieldTypes.length; i++) {
if (fieldTypes[i].getLogicalType() instanceof TimestampType) {
stringFieldTypes[i] = "DATETIME";
stringFieldTypes[i] = "TIMESTAMP";
} else if (fieldTypes[i].getLogicalType() instanceof VarCharType) {
stringFieldTypes[i] = "VARCHAR";
} else if (fieldTypes[i].getLogicalType() instanceof LocalZonedTimestampType ) {
Expand Down Expand Up @@ -221,7 +227,7 @@ public static void xsyncToPg(StreamExecutionEnvironment env) throws SQLException
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
} else {
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
env.enableCheckpointing(checkpointInterval, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
}
StreamTableEnvironment tEnvs = StreamTableEnvironment.create(env);
Expand All @@ -241,14 +247,19 @@ public static void xsyncToPg(StreamExecutionEnvironment env) throws SQLException
statement.executeUpdate(createTableSql.toString());
StringBuilder coulmns = new StringBuilder();
for (int i = 0; i < fieldDataTypes.length; i++) {
if (stringFieldsTypes[i].equals("BYTEA")) {
coulmns.append("`").append(fieldNames[i]).append("` ").append("BYTES");
} else if (stringFieldsTypes[i].equals("TEXT")) {
coulmns.append("`").append(fieldNames[i]).append("` ").append("VARCHAR");
} else if (stringFieldsTypes[i].equals("FLOAT8")) {
coulmns.append("`").append(fieldNames[i]).append("`").append("DOUBLE");
} else {
coulmns.append("`").append(fieldNames[i]).append("` ").append(stringFieldsTypes[i]);
switch (stringFieldsTypes[i]) {
case "BYTEA":
coulmns.append("`").append(fieldNames[i]).append("` ").append("BYTES");
break;
case "TEXT":
coulmns.append("`").append(fieldNames[i]).append("` ").append("VARCHAR");
break;
case "FLOAT8":
coulmns.append("`").append(fieldNames[i]).append("`").append("DOUBLE");
break;
default:
coulmns.append("`").append(fieldNames[i]).append("` ").append(stringFieldsTypes[i]);
break;
}
if (i < fieldDataTypes.length - 1) {
coulmns.append(",");
Expand Down Expand Up @@ -286,7 +297,7 @@ public static void xsyncToMysql(StreamExecutionEnvironment env) throws SQLExcept
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
} else {
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
env.enableCheckpointing(checkpointInterval, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
}
StreamTableEnvironment tEnvs = StreamTableEnvironment.create(env);
Expand Down Expand Up @@ -351,7 +362,7 @@ public static void xsyncToDoris(StreamExecutionEnvironment env, String fenodes)
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
} else {
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
env.enableCheckpointing(checkpointInterval, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
}
StreamTableEnvironment tEnvs = StreamTableEnvironment.create(env);
Expand All @@ -372,11 +383,26 @@ public static void xsyncToDoris(StreamExecutionEnvironment env, String fenodes)
String sql;
if (jdbcOrDorisOptions == null){
sql = String.format(
"create table %s(%s) with ('connector' = '%s', 'jdbc-url' = '%s', 'fenodes' = '%s', 'table.identifier' = '%s', 'username' = '%s', 'password' = '%s')",
"create table %s(%s) with ('connector' = '%s'," +
" 'jdbc-url' = '%s'," +
" 'fenodes' = '%s'," +
" 'table.identifier' = '%s'," +
" 'username' = '%s'," +
" 'password' = '%s'," +
" 'sink.properties.format' = 'json'," +
" 'sink.properties.read_json_by_line' = 'true')",
targetTableName, coulmns, "doris", jdbcUrl, fenodes, targetDatabase + "." + targetTableName, username, password);
}else {
sql = String.format(
"create table %s(%s) with ('connector' = '%s', 'jdbc-url' = '%s', 'fenodes' = '%s', 'table.identifier' = '%s', 'username' = '%s', 'password' = '%s', %s)",
"create table %s(%s) with ('connector' = '%s'," +
" 'jdbc-url' = '%s'," +
" 'fenodes' = '%s'," +
" 'table.identifier' = '%s'," +
" 'username' = '%s'," +
" 'password' = '%s'," +
" 'sink.properties.format' = 'json'," +
" 'sink.properties.read_json_by_line' = 'true'," +
" %s)",
targetTableName, coulmns, "doris", jdbcUrl, fenodes, targetDatabase + "." + targetTableName, username, password, jdbcOrDorisOptions);
}

Expand All @@ -393,7 +419,7 @@ public static void xsyncToMongodb(StreamExecutionEnvironment env,
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
} else {
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
env.enableCheckpointing(checkpointInterval, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
}
StreamTableEnvironment tEnvs = StreamTableEnvironment.create(env);
Expand Down

0 comments on commit cf87946

Please sign in to comment.