Skip to content

Commit

Permalink
add MariaDB endType support and add 增加任务实例和数据源复制功能 datavane/tis#303
Browse files Browse the repository at this point in the history
  • Loading branch information
baisui1981 committed Jul 20, 2024
1 parent 9a2775f commit b182f14
Show file tree
Hide file tree
Showing 17 changed files with 908 additions and 9 deletions.
1 change: 1 addition & 0 deletions tis-datax/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
<module>tis-ds-mysql-plugin</module>
<module>tis-ds-mysql-v5-plugin</module>
<module>tis-ds-mysql-v8-plugin</module>
<module>tis-ds-mysql-mariadb-plugin</module>
<module>tis-datax-local-executor</module>
<!-- 为测试通过暂时注释掉-->
<module>tis-datax-hudi-dependency</module>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class ReaderFilterNormalQuery extends ReaderFilter {
@Override
public Document createFilter() {
Document fitler = Document.parse(query);
logger.info("create MongoDB collection filter:{}", fitler.toJson());
return fitler;
}

Expand Down
58 changes: 58 additions & 0 deletions tis-datax/tis-ds-mysql-mariadb-plugin/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--~
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>com.qlangtech.tis.plugins</groupId>
<artifactId>tis-datax</artifactId>
<version>${revision}</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>tpi</packaging>
<properties>
<mariadb-java-client.version>3.4.1</mariadb-java-client.version>
</properties>
<!--https://mariadb.com/kb/en/about-mariadb-connector-j/#java-compatibility-->
<artifactId>tis-ds-mysql-mariadb-plugin</artifactId>
<dependencies>
<dependency>
<groupId>com.qlangtech.tis.plugins</groupId>
<artifactId>tis-ds-mysql-plugin</artifactId>
<version>${project.version}</version>
</dependency>


<dependency>
<groupId>org.mariadb.jdbc</groupId>
<artifactId>mariadb-java-client</artifactId>
<version>${mariadb-java-client.version}</version>
</dependency>


<!-- <dependency>-->
<!-- <groupId>com.qlangtech.tis.plugins</groupId>-->
<!-- <artifactId>tis-datax-common-plugin</artifactId>-->
<!-- </dependency>-->
</dependencies>


</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.qlangtech.tis.plugin.ds.mysql;

import com.google.common.collect.Lists;
import com.qlangtech.tis.annotation.Public;
import com.qlangtech.tis.extension.TISExtension;
import com.qlangtech.tis.manage.common.TisUTF8;
import com.qlangtech.tis.plugin.ds.DBConfig;
import org.apache.commons.lang3.StringUtils;

import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;

/**
* @author: 百岁([email protected]
* @create: 2024-06-08 21:47
**/
@Public
public class MariaDBDataSourceFactory extends MySQLDataSourceFactory {

protected static final String DS_TYPE_MARIA_DB = "MariaDB";

private transient org.mariadb.jdbc.Driver driver;

@Override
public JDBCConnection getConnection(String jdbcUrl, boolean verify) throws SQLException {
if (driver == null) {
driver = new org.mariadb.jdbc.Driver();
}
java.util.Properties info = new java.util.Properties();
if (this.userName != null) {
info.put("user", this.userName);
}
if (this.password != null) {
info.put("password", this.password);
}
if (verify) {
info.put("connectTimeout", "3000");
info.put("socketTimeout", "3000");
info.put("autoReconnect", "false");
}

return new JDBCConnection(driver.connect(jdbcUrl, info), jdbcUrl);
}

@Override
public String buidJdbcUrl(DBConfig db, String ip, String dbName) {

// https://mariadb.com/kb/en/about-mariadb-connector-j/#java-compatibility
// StringBuffer jdbcUrl = new StringBuffer("jdbc:mariadb://" + ip + ":" + this.port + "/" + dbName +
// "?useUnicode=yes&useCursorFetch=true&useSsl=false&serverTimezone=" + URLEncoder.encode(DEFAULT_SERVER_TIME_ZONE.getId(), TisUTF8.getName()));

StringBuffer jdbcUrl = new StringBuffer("jdbc:mariadb://" + ip + ":" + this.port + "/" + dbName);
if (this.useCompression != null) {
jdbcUrl.append("&useCompression=").append(this.useCompression);
}
// if (org.apache.commons.lang.StringUtils.isNotEmpty(this.encode)) {
// jdbcUrl.append("&characterEncoding=").append(this.encode);
// }
if (org.apache.commons.lang.StringUtils.isNotEmpty(this.extraParams)) {
jdbcUrl.append("&" + this.extraParams);
}
return jdbcUrl.toString();

}

@Override
public void setReaderStatement(Statement stmt) throws SQLException {
com.mysql.jdbc.Statement statement = (com.mysql.jdbc.Statement) stmt;
statement.enableStreamingResults();
//statement.setFetchSize(0);
}

@TISExtension
public static class MariaDBDescriptor extends DefaultDescriptor {
@Override
protected String getDataSourceName() {
return DS_TYPE_MARIA_DB;
}

@Override
public final EndType getEndType() {
return EndType.MariaDB;
}

@Override
public List<String> facadeSourceTypes() {
return Lists.newArrayList(DS_TYPE_MARIA_DB);
}

@Override
protected boolean validateMySQLVer(String mysqlVer) {
return StringUtils.containsIgnoreCase(mysqlVer, DS_TYPE_MARIA_DB);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"encode": {
"disable": true
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
* 封装`MariaDB`数据源驱动

驱动版本为[MariaDB Connector/J is for Java 8:3.4.1](https://mariadb.com/downloads/connectors/connectors-data-access/java8-connector), 支持`MariaDB`数据源以JDBC的方式连接
36 changes: 36 additions & 0 deletions tis-datax/tis-ds-mysql-mariadb-plugin/src/test/java/TestAll.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import com.qlangtech.tis.plugin.ds.mysql.TestMariaDBDataSourceFactory;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;

/**
* @author: baisui 百岁
* @create: 2021-01-07 18:52
**/
public class TestAll extends TestCase {

public static Test suite() {
TestSuite suite = new TestSuite();
suite.addTestSuite(TestMariaDBDataSourceFactory.class);

return suite;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.qlangtech.tis.plugin.ds.mysql;

import com.qlangtech.tis.plugin.ds.ColumnMetaData;
import com.qlangtech.tis.sql.parser.tuple.creator.EntityName;
import com.qlangtech.tis.trigger.util.JsonUtil;
import junit.framework.TestCase;
import org.junit.Ignore;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.util.Collections;
import java.util.List;

/**
* @author: 百岁([email protected]
* @create: 2021-12-30 10:56
**/
public class TestMariaDBDataSourceFactory extends TestCase {
public void testGetTableMetadata() {
MariaDBDataSourceFactory dataSourceFactory = new MariaDBDataSourceFactory();
dataSourceFactory.useCompression = true;
dataSourceFactory.password = "123456";
dataSourceFactory.dbName = "order2";
dataSourceFactory.encode = "utf8";
dataSourceFactory.port = 3306;
dataSourceFactory.userName = "root";
dataSourceFactory.nodeDesc = "192.168.28.200";


List<ColumnMetaData> baseColsMeta = dataSourceFactory.getTableMetadata(false, EntityName.parse("base"));
assertEquals(8, baseColsMeta.size());


JsonUtil.assertJSONEqual(TestMariaDBDataSourceFactory.class, "base-cols-meta.json"
, JsonUtil.toString(Collections.singletonMap("cols", baseColsMeta)), (msg, e, a) -> {
assertEquals(msg, e, a);
});
}


@Ignore
public void testAliyunAdsSource() {
MariaDBDataSourceFactory dataSourceFactory = new MariaDBDataSourceFactory();
dataSourceFactory.useCompression = false;
dataSourceFactory.password = "SLK_20221218";
dataSourceFactory.dbName = "local-life";
dataSourceFactory.encode = "utf8";
dataSourceFactory.port = 3306;
dataSourceFactory.userName = "slk";
dataSourceFactory.nodeDesc = "am-2ev66ttmhd5ys6k3l167320o.ads.aliyuncs.com";

dataSourceFactory.visitFirstConnection((connection) -> {
try {
// connection.execute(
// "CREATE TABLE `cloudcanal_heartbeat_baisui` (\n" +
// " `id` varchar(32) NOT NULL COMMENT '主键ID',\n" +
// " `name` varchar(50) NOT NULL COMMENT '姓名',\n" +
// " `is_valid` int(1) NOT NULL DEFAULT '1' COMMENT '是否有效,1:有效 0 无效',\n" +
// " `create_time` bigint(20) NOT NULL COMMENT '创建时间',\n" +
// " `op_time` bigint(20) DEFAULT NULL COMMENT '操作时间',\n" +
// " `last_ver` int(11) NOT NULL DEFAULT '1' COMMENT '版本号',\n" +
// " `op_user_id` varchar(32) NOT NULL COMMENT '操作人',\n" +
// " `ext` varchar(1000) DEFAULT NULL,\n" +
// " PRIMARY KEY (`id`)\n" +
// ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4"
// );

// connection.execute(
// "INSERT INTO `cloudcanal_heartbeat_baisui`(`id`, `name`, `is_valid`, `create_time`, `op_time`, `last_ver`, `op_user_id`, `ext`)\n" +
// "VALUES ('3', 'cloudcanal_heartbeat', '1', 1690275535280, 1690791060013, '2', '', NULL)"
// );

Connection cnn = connection.getConnection();
try (PreparedStatement prep = cnn.prepareStatement("insert into `cloudcanal_heartbeat_baisui`(`id`, `name`, `is_valid`, `create_time`, `op_time`, `last_ver`, `op_user_id`, `ext`)" +
" values (?,?,?,?,?,?,?,?)")) {

prep.setString(1, "11");
prep.setString(2, "baisui");
prep.setInt(3, 1);
prep.setLong(4, System.currentTimeMillis());
prep.setLong(5, System.currentTimeMillis());
prep.setInt(6, 5);
prep.setString(7, "22");
prep.setString(8, "xxx");

prep.execute();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
});

}
}
Loading

0 comments on commit b182f14

Please sign in to comment.