diff --git a/chunjun-connectors/chunjun-connector-gbase8s/pom.xml b/chunjun-connectors/chunjun-connector-gbase8s/pom.xml new file mode 100644 index 0000000000..d169f9262c --- /dev/null +++ b/chunjun-connectors/chunjun-connector-gbase8s/pom.xml @@ -0,0 +1,63 @@ + + + + + + chunjun-connectors + com.dtstack.chunjun + ${revision} + + 4.0.0 + + chunjun-connector-gbase8s + ChunJun : Connectors : GBase8s + + + gbase8s + + + + + com.dtstack.chunjun + chunjun-connector-jdbc-base + ${project.version} + + + com.gbasedbt.jdbc.Driver + gbasedbt + 3.5.1_1_d0c87a + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + org.apache.maven.plugins + maven-antrun-plugin + + + + diff --git a/chunjun-connectors/chunjun-connector-gbase8s/src/main/java/com/dtstack/chunjun/connector/gbase8s/converter/Gbase8sRawTypeConverter.java b/chunjun-connectors/chunjun-connector-gbase8s/src/main/java/com/dtstack/chunjun/connector/gbase8s/converter/Gbase8sRawTypeConverter.java new file mode 100644 index 0000000000..d256ab82a4 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-gbase8s/src/main/java/com/dtstack/chunjun/connector/gbase8s/converter/Gbase8sRawTypeConverter.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.gbase8s.converter; + +import com.dtstack.chunjun.config.TypeConfig; +import com.dtstack.chunjun.throwable.UnsupportedTypeException; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.DataType; + +public class Gbase8sRawTypeConverter { + + public static DataType apply(TypeConfig type) { + switch (type.getType()) { + case "BIT": + return DataTypes.BOOLEAN(); + case "TINYINT": + return DataTypes.TINYINT(); + case "SMALLINT": + case "MEDIUMINT": + case "INT": + case "INTEGER": + case "INT24": + case "SERIAL": + return DataTypes.INT(); + case "BIGINT": + case "INT8": + case "BIGSERIAL": + case "SERIAL8": + return DataTypes.BIGINT(); + case "REAL": + case "FLOAT": + case "SMALLFLOAT": + return DataTypes.FLOAT(); + case "DECIMAL": + case "DEC": + case "NUMERIC": + case "MONEY": + // TODO 精度应该可以动态传进来? + return DataTypes.DECIMAL(38, 18); + case "DOUBLE": + case "PRECISION": + return DataTypes.DOUBLE(); + case "CHAR": + case "VARCHAR": + case "TINYTEXT": + case "TEXT": + case "MEDIUMTEXT": + case "LVARCHAR": + case "LONGTEXT": + case "JSON": + case "ENUM": + case "CHARACTER": + case "VARYING": + case "NCHAR": + case "SET": + return DataTypes.STRING(); + case "DATE": + return DataTypes.DATE(); + case "YEAR": + return DataTypes.INTERVAL(DataTypes.YEAR()); + case "TIME": + return DataTypes.TIME(); + case "TIMESTAMP": + return DataTypes.TIMESTAMP(); + case "DATETIME": + return DataTypes.TIMESTAMP(5); + case "TINYBLOB": + case "BLOB": + case "MEDIUMBLOB": + case "LONGBLOB": + case "BINARY": + case "VARBINARY": + case "GEOMETRY": + // BYTES 底层调用的是VARBINARY最大长度 + return DataTypes.BYTES(); + + default: + throw new UnsupportedTypeException(type); + } + } +} diff --git a/chunjun-connectors/chunjun-connector-gbase8s/src/main/java/com/dtstack/chunjun/connector/gbase8s/dialect/Gbase8sDialect.java b/chunjun-connectors/chunjun-connector-gbase8s/src/main/java/com/dtstack/chunjun/connector/gbase8s/dialect/Gbase8sDialect.java new file mode 100644 index 0000000000..5cea64d4a6 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-gbase8s/src/main/java/com/dtstack/chunjun/connector/gbase8s/dialect/Gbase8sDialect.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.gbase8s.dialect; + +import com.dtstack.chunjun.connector.gbase8s.converter.Gbase8sRawTypeConverter; +import com.dtstack.chunjun.connector.jdbc.dialect.JdbcDialect; +import com.dtstack.chunjun.converter.RawTypeMapper; + +import org.apache.commons.lang3.StringUtils; + +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +public class Gbase8sDialect implements JdbcDialect { + + private static final String GBASE_QUOTATION_MASK = ""; + + @Override + public String dialectName() { + return "GBase8s"; + } + + @Override + public boolean canHandle(String url) { + return url.startsWith("jdbc:gbasedbt-sqli:"); + } + + @Override + public RawTypeMapper getRawTypeConverter() { + return Gbase8sRawTypeConverter::apply; + } + + @Override + public Optional defaultDriverName() { + return Optional.of("com.gbasedbt.jdbc.Driver"); + } + + /** build select sql , such as (SELECT :A "A",? "B" FROM DUAL) */ + public String buildDualQueryStatement(String[] column) { + StringBuilder sb = new StringBuilder("SELECT count(1),"); + String placeholders = + Arrays.stream(column) + .map(f -> ":" + f + " as " + quoteIdentifier(f)) + .collect(Collectors.joining(", ")); + sb.append(placeholders); + + return sb.toString(); + } + + @Override + public Optional getUpsertStatement( + String schema, + String tableName, + String[] fieldNames, + String[] uniqueKeyFields, + boolean allReplace) { + tableName = buildTableInfoWithSchema(schema, tableName); + StringBuilder mergeIntoSql = new StringBuilder(64); + mergeIntoSql + .append("MERGE INTO ") + .append(tableName) + .append(" T1 USING (") + .append(buildDualQueryStatement(fieldNames)) + .append(" FROM ") + .append(tableName) + .append(" limit 1 ") + .append(") T2 ON (") + .append(buildEqualConditions(uniqueKeyFields)) + .append(") "); + + String updateSql = buildUpdateConnection(fieldNames, uniqueKeyFields, allReplace); + + if (StringUtils.isNotEmpty(updateSql)) { + mergeIntoSql.append(" WHEN MATCHED THEN UPDATE SET "); + mergeIntoSql.append(updateSql); + } + + mergeIntoSql + .append(" WHEN NOT MATCHED THEN ") + .append("INSERT (") + .append( + Arrays.stream(fieldNames) + .map(this::quoteIdentifier) + .collect(Collectors.joining(", "))) + .append(") VALUES (") + .append( + Arrays.stream(fieldNames) + .map(col -> "T2." + quoteIdentifier(col)) + .collect(Collectors.joining(", "))) + .append(")"); + + return Optional.of(mergeIntoSql.toString()); + } + + /** build T1."A"=T2."A" or T1."A"=nvl(T2."A",T1."A") */ + private String buildUpdateConnection( + String[] fieldNames, String[] uniqueKeyFields, boolean allReplace) { + List uniqueKeyList = Arrays.asList(uniqueKeyFields); + return Arrays.stream(fieldNames) + .filter(col -> !uniqueKeyList.contains(col)) + .map(col -> buildConnectString(allReplace, col)) + .collect(Collectors.joining(",")); + } + + /** + * Depending on parameter [allReplace] build different sql part. e.g T1."A"=T2."A" or + * T1."A"=nvl(T2."A",T1."A") + */ + private String buildConnectString(boolean allReplace, String col) { + return allReplace + ? quoteIdentifier("T1") + + "." + + quoteIdentifier(col) + + " = " + + quoteIdentifier("T2") + + "." + + quoteIdentifier(col) + : quoteIdentifier("T1") + + "." + + quoteIdentifier(col) + + " =NVL(" + + quoteIdentifier("T2") + + "." + + quoteIdentifier(col) + + "," + + quoteIdentifier("T1") + + "." + + quoteIdentifier(col) + + ")"; + } + + /** build sql part e.g: T1.`A` = T2.`A`, T1.`B` = T2.`B` */ + private String buildEqualConditions(String[] uniqueKeyFields) { + return Arrays.stream(uniqueKeyFields) + .map(col -> "T1." + quoteIdentifier(col) + " = T2." + quoteIdentifier(col)) + .collect(Collectors.joining(", ")); + } + + @Override + public String quoteIdentifier(String identifier) { + if (identifier.startsWith(GBASE_QUOTATION_MASK) + && identifier.endsWith(GBASE_QUOTATION_MASK)) { + return identifier; + } + return GBASE_QUOTATION_MASK + identifier + GBASE_QUOTATION_MASK; + } + + @Override + public String getRowNumColumn(String orderBy) { + return "ROWID as " + getRowNumColumnAlias(); + } +} diff --git a/chunjun-connectors/chunjun-connector-gbase8s/src/main/java/com/dtstack/chunjun/connector/gbase8s/sink/Gbase8sSinkFactory.java b/chunjun-connectors/chunjun-connector-gbase8s/src/main/java/com/dtstack/chunjun/connector/gbase8s/sink/Gbase8sSinkFactory.java new file mode 100644 index 0000000000..faa4525b73 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-gbase8s/src/main/java/com/dtstack/chunjun/connector/gbase8s/sink/Gbase8sSinkFactory.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.gbase8s.sink; + +import com.dtstack.chunjun.config.SyncConfig; +import com.dtstack.chunjun.connector.gbase8s.dialect.Gbase8sDialect; +import com.dtstack.chunjun.connector.jdbc.sink.JdbcSinkFactory; + +public class Gbase8sSinkFactory extends JdbcSinkFactory { + + public Gbase8sSinkFactory(SyncConfig syncConfig) { + super(syncConfig, new Gbase8sDialect()); + } +} diff --git a/chunjun-connectors/chunjun-connector-gbase8s/src/main/java/com/dtstack/chunjun/connector/gbase8s/source/Gbase8sSourceFactory.java b/chunjun-connectors/chunjun-connector-gbase8s/src/main/java/com/dtstack/chunjun/connector/gbase8s/source/Gbase8sSourceFactory.java new file mode 100644 index 0000000000..125685c349 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-gbase8s/src/main/java/com/dtstack/chunjun/connector/gbase8s/source/Gbase8sSourceFactory.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.gbase8s.source; + +import com.dtstack.chunjun.config.SyncConfig; +import com.dtstack.chunjun.connector.gbase8s.dialect.Gbase8sDialect; +import com.dtstack.chunjun.connector.jdbc.source.JdbcSourceFactory; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import org.apache.commons.lang3.StringUtils; + +public class Gbase8sSourceFactory extends JdbcSourceFactory { + + public Gbase8sSourceFactory(SyncConfig syncConfig, StreamExecutionEnvironment env) { + super(syncConfig, env, new Gbase8sDialect()); + // 避免result.next阻塞 + if (jdbcConfig.isPolling() + && StringUtils.isEmpty(jdbcConfig.getStartLocation()) + && jdbcConfig.getFetchSize() == 0) { + jdbcConfig.setFetchSize(1000); + } + } +} diff --git a/chunjun-connectors/chunjun-connector-gbase8s/src/main/java/com/dtstack/chunjun/connector/gbase8s/table/Gbase8sDynamicTableFactory.java b/chunjun-connectors/chunjun-connector-gbase8s/src/main/java/com/dtstack/chunjun/connector/gbase8s/table/Gbase8sDynamicTableFactory.java new file mode 100644 index 0000000000..95935f3b72 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-gbase8s/src/main/java/com/dtstack/chunjun/connector/gbase8s/table/Gbase8sDynamicTableFactory.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.gbase8s.table; + +import com.dtstack.chunjun.connector.gbase8s.dialect.Gbase8sDialect; +import com.dtstack.chunjun.connector.jdbc.dialect.JdbcDialect; +import com.dtstack.chunjun.connector.jdbc.table.JdbcDynamicTableFactory; + +public class Gbase8sDynamicTableFactory extends JdbcDynamicTableFactory { + + private static final String IDENTIFIER = "gbase8s-x"; + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + protected JdbcDialect getDialect() { + return new Gbase8sDialect(); + } +} diff --git a/chunjun-connectors/chunjun-connector-gbase8s/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/chunjun-connectors/chunjun-connector-gbase8s/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory new file mode 100644 index 0000000000..70692e4108 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-gbase8s/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +com.dtstack.chunjun.connector.gbase8s.table.Gbase8sDynamicTableFactory diff --git a/chunjun-connectors/chunjun-connector-gbasehk/pom.xml b/chunjun-connectors/chunjun-connector-gbasehk/pom.xml new file mode 100644 index 0000000000..b247ecb055 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-gbasehk/pom.xml @@ -0,0 +1,63 @@ + + + + + + chunjun-connectors + com.dtstack.chunjun + ${revision} + + 4.0.0 + + chunjun-connector-gbasehk + ChunJun : Connectors : GBasehk + + + gbasehk + + + + + com.dtstack.chunjun + chunjun-connector-jdbc-base + ${project.version} + + + com.gbase.hk.jdbc.Driver + gbase + 8.8.0.13 + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + org.apache.maven.plugins + maven-antrun-plugin + + + + diff --git a/chunjun-connectors/chunjun-connector-gbasehk/src/main/java/com/dtstack/chunjun/connector/gbasehk/converter/GbasehkRawTypeConverter.java b/chunjun-connectors/chunjun-connector-gbasehk/src/main/java/com/dtstack/chunjun/connector/gbasehk/converter/GbasehkRawTypeConverter.java new file mode 100644 index 0000000000..e057965635 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-gbasehk/src/main/java/com/dtstack/chunjun/connector/gbasehk/converter/GbasehkRawTypeConverter.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.gbasehk.converter; + +import com.dtstack.chunjun.config.TypeConfig; +import com.dtstack.chunjun.throwable.UnsupportedTypeException; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.DataType; + +public class GbasehkRawTypeConverter { + + public static DataType apply(TypeConfig type) { + switch (type.getType()) { + case "BIT": + return DataTypes.BOOLEAN(); + case "TINYINT": + return DataTypes.TINYINT(); + case "SMALLINT": + case "MEDIUMINT": + case "INT": + case "INTEGER": + case "INT24": + case "SERIAL": + return DataTypes.INT(); + case "BIGINT": + case "INT8": + case "BIGSERIAL": + case "SERIAL8": + return DataTypes.BIGINT(); + case "REAL": + case "FLOAT": + case "SMALLFLOAT": + return DataTypes.FLOAT(); + case "DECIMAL": + case "DEC": + case "NUMERIC": + case "MONEY": + // TODO 精度应该可以动态传进来? + return DataTypes.DECIMAL(38, 18); + case "DOUBLE": + case "PRECISION": + return DataTypes.DOUBLE(); + case "CHAR": + case "VARCHAR": + case "TINYTEXT": + case "TEXT": + case "MEDIUMTEXT": + case "LVARCHAR": + case "LONGTEXT": + case "JSON": + case "ENUM": + case "CHARACTER": + case "VARYING": + case "NCHAR": + case "SET": + return DataTypes.STRING(); + case "DATE": + return DataTypes.DATE(); + case "YEAR": + return DataTypes.INTERVAL(DataTypes.YEAR()); + case "TIME": + return DataTypes.TIME(); + case "TIMESTAMP": + return DataTypes.TIMESTAMP(); + case "DATETIME": + return DataTypes.TIMESTAMP(5); + case "TINYBLOB": + case "BLOB": + case "MEDIUMBLOB": + case "LONGBLOB": + case "BINARY": + case "VARBINARY": + case "GEOMETRY": + // BYTES 底层调用的是VARBINARY最大长度 + return DataTypes.BYTES(); + + default: + throw new UnsupportedTypeException(type); + } + } +} diff --git a/chunjun-connectors/chunjun-connector-gbasehk/src/main/java/com/dtstack/chunjun/connector/gbasehk/dialect/GbasehkDialect.java b/chunjun-connectors/chunjun-connector-gbasehk/src/main/java/com/dtstack/chunjun/connector/gbasehk/dialect/GbasehkDialect.java new file mode 100644 index 0000000000..f933d68639 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-gbasehk/src/main/java/com/dtstack/chunjun/connector/gbasehk/dialect/GbasehkDialect.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.gbasehk.dialect; + +import com.dtstack.chunjun.connector.gbasehk.converter.GbasehkRawTypeConverter; +import com.dtstack.chunjun.connector.jdbc.dialect.JdbcDialect; +import com.dtstack.chunjun.converter.RawTypeMapper; + +import org.apache.commons.lang3.StringUtils; + +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +public class GbasehkDialect implements JdbcDialect { + + private static final String GBASE_QUOTATION_MASK = "`"; + + @Override + public String dialectName() { + return "GBasehk"; + } + + @Override + public boolean canHandle(String url) { + return url.startsWith("jdbc:gbase:"); + } + + @Override + public RawTypeMapper getRawTypeConverter() { + return GbasehkRawTypeConverter::apply; + } + + @Override + public Optional defaultDriverName() { + return Optional.of("com.gbase.hk.jdbc.Driver"); + } + + /** build select sql , such as (SELECT :A "A",? "B" FROM DUAL) */ + public String buildDualQueryStatement(String[] column) { + StringBuilder sb = new StringBuilder("SELECT count(1),"); + String placeholders = + Arrays.stream(column) + .map(f -> ":" + f + " as " + quoteIdentifier(f)) + .collect(Collectors.joining(", ")); + sb.append(placeholders); + + return sb.toString(); + } + + @Override + public Optional getUpsertStatement( + String schema, + String tableName, + String[] fieldNames, + String[] uniqueKeyFields, + boolean allReplace) { + tableName = buildTableInfoWithSchema(schema, tableName); + StringBuilder mergeIntoSql = new StringBuilder(64); + mergeIntoSql + .append("MERGE INTO ") + .append(tableName) + .append(" T1 USING (") + .append(buildDualQueryStatement(fieldNames)) + .append(" FROM ") + .append(tableName) + .append(" limit 1 ") + .append(") T2 ON (") + .append(buildEqualConditions(uniqueKeyFields)) + .append(") "); + + String updateSql = buildUpdateConnection(fieldNames, uniqueKeyFields, allReplace); + + if (StringUtils.isNotEmpty(updateSql)) { + mergeIntoSql.append(" WHEN MATCHED THEN UPDATE SET "); + mergeIntoSql.append(updateSql); + } + + mergeIntoSql + .append(" WHEN NOT MATCHED THEN ") + .append("INSERT (") + .append( + Arrays.stream(fieldNames) + .map(this::quoteIdentifier) + .collect(Collectors.joining(", "))) + .append(") VALUES (") + .append( + Arrays.stream(fieldNames) + .map(col -> "T2." + quoteIdentifier(col)) + .collect(Collectors.joining(", "))) + .append(")"); + + return Optional.of(mergeIntoSql.toString()); + } + + /** build T1."A"=T2."A" or T1."A"=nvl(T2."A",T1."A") */ + private String buildUpdateConnection( + String[] fieldNames, String[] uniqueKeyFields, boolean allReplace) { + List uniqueKeyList = Arrays.asList(uniqueKeyFields); + return Arrays.stream(fieldNames) + .filter(col -> !uniqueKeyList.contains(col)) + .map(col -> buildConnectString(allReplace, col)) + .collect(Collectors.joining(",")); + } + + /** + * Depending on parameter [allReplace] build different sql part. e.g T1."A"=T2."A" or + * T1."A"=nvl(T2."A",T1."A") + */ + private String buildConnectString(boolean allReplace, String col) { + return allReplace + ? quoteIdentifier("T1") + + "." + + quoteIdentifier(col) + + " = " + + quoteIdentifier("T2") + + "." + + quoteIdentifier(col) + : quoteIdentifier("T1") + + "." + + quoteIdentifier(col) + + " =NVL(" + + quoteIdentifier("T2") + + "." + + quoteIdentifier(col) + + "," + + quoteIdentifier("T1") + + "." + + quoteIdentifier(col) + + ")"; + } + + /** build sql part e.g: T1.`A` = T2.`A`, T1.`B` = T2.`B` */ + private String buildEqualConditions(String[] uniqueKeyFields) { + return Arrays.stream(uniqueKeyFields) + .map(col -> "T1." + quoteIdentifier(col) + " = T2." + quoteIdentifier(col)) + .collect(Collectors.joining(", ")); + } + + @Override + public String quoteIdentifier(String identifier) { + if (identifier.startsWith(GBASE_QUOTATION_MASK) + && identifier.endsWith(GBASE_QUOTATION_MASK)) { + return identifier; + } + return GBASE_QUOTATION_MASK + identifier + GBASE_QUOTATION_MASK; + } + + @Override + public String getRowNumColumn(String orderBy) { + return "ROWID as " + getRowNumColumnAlias(); + } +} diff --git a/chunjun-connectors/chunjun-connector-gbasehk/src/main/java/com/dtstack/chunjun/connector/gbasehk/sink/GbasehkSinkFactory.java b/chunjun-connectors/chunjun-connector-gbasehk/src/main/java/com/dtstack/chunjun/connector/gbasehk/sink/GbasehkSinkFactory.java new file mode 100644 index 0000000000..1c29dec1cb --- /dev/null +++ b/chunjun-connectors/chunjun-connector-gbasehk/src/main/java/com/dtstack/chunjun/connector/gbasehk/sink/GbasehkSinkFactory.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.gbasehk.sink; + +import com.dtstack.chunjun.config.SyncConfig; +import com.dtstack.chunjun.connector.gbasehk.dialect.GbasehkDialect; +import com.dtstack.chunjun.connector.jdbc.sink.JdbcSinkFactory; + +public class GbasehkSinkFactory extends JdbcSinkFactory { + + public GbasehkSinkFactory(SyncConfig syncConfig) { + super(syncConfig, new GbasehkDialect()); + } +} diff --git a/chunjun-connectors/chunjun-connector-gbasehk/src/main/java/com/dtstack/chunjun/connector/gbasehk/source/GbasehkSourceFactory.java b/chunjun-connectors/chunjun-connector-gbasehk/src/main/java/com/dtstack/chunjun/connector/gbasehk/source/GbasehkSourceFactory.java new file mode 100644 index 0000000000..7a4f12274a --- /dev/null +++ b/chunjun-connectors/chunjun-connector-gbasehk/src/main/java/com/dtstack/chunjun/connector/gbasehk/source/GbasehkSourceFactory.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.gbasehk.source; + +import com.dtstack.chunjun.config.SyncConfig; +import com.dtstack.chunjun.connector.gbasehk.dialect.GbasehkDialect; +import com.dtstack.chunjun.connector.jdbc.source.JdbcSourceFactory; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import org.apache.commons.lang3.StringUtils; + +public class GbasehkSourceFactory extends JdbcSourceFactory { + + public GbasehkSourceFactory(SyncConfig syncConfig, StreamExecutionEnvironment env) { + super(syncConfig, env, new GbasehkDialect()); + // 避免result.next阻塞 + if (jdbcConfig.isPolling() + && StringUtils.isEmpty(jdbcConfig.getStartLocation()) + && jdbcConfig.getFetchSize() == 0) { + jdbcConfig.setFetchSize(1000); + } + } +} diff --git a/chunjun-connectors/chunjun-connector-gbasehk/src/main/java/com/dtstack/chunjun/connector/gbasehk/table/GbasehkDynamicTableFactory.java b/chunjun-connectors/chunjun-connector-gbasehk/src/main/java/com/dtstack/chunjun/connector/gbasehk/table/GbasehkDynamicTableFactory.java new file mode 100644 index 0000000000..39bf4f7524 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-gbasehk/src/main/java/com/dtstack/chunjun/connector/gbasehk/table/GbasehkDynamicTableFactory.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.gbasehk.table; + +import com.dtstack.chunjun.connector.gbasehk.dialect.GbasehkDialect; +import com.dtstack.chunjun.connector.jdbc.dialect.JdbcDialect; +import com.dtstack.chunjun.connector.jdbc.table.JdbcDynamicTableFactory; + +public class GbasehkDynamicTableFactory extends JdbcDynamicTableFactory { + + private static final String IDENTIFIER = "gbasehk-x"; + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + protected JdbcDialect getDialect() { + return new GbasehkDialect(); + } +} diff --git a/chunjun-connectors/chunjun-connector-gbasehk/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/chunjun-connectors/chunjun-connector-gbasehk/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory new file mode 100644 index 0000000000..140425edf1 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-gbasehk/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +com.dtstack.chunjun.connector.gbasehk.table.GbasehkDynamicTableFactory diff --git a/chunjun-connectors/pom.xml b/chunjun-connectors/pom.xml index 4cb239ed20..d487569fae 100755 --- a/chunjun-connectors/pom.xml +++ b/chunjun-connectors/pom.xml @@ -57,6 +57,8 @@ chunjun-connector-greenplum chunjun-connector-dm chunjun-connector-gbase + chunjun-connector-gbase8s + chunjun-connector-gbasehk chunjun-connector-clickhouse chunjun-connector-saphana chunjun-connector-doris