From b182f1493780d52ea020331817e74c8316d5eec7 Mon Sep 17 00:00:00 2001 From: mozhenghua Date: Sat, 20 Jul 2024 22:56:47 +0800 Subject: [PATCH] =?UTF-8?q?add=20MariaDB=20endType=20support=20and=20add?= =?UTF-8?q?=20=E5=A2=9E=E5=8A=A0=E4=BB=BB=E5=8A=A1=E5=AE=9E=E4=BE=8B?= =?UTF-8?q?=E5=92=8C=E6=95=B0=E6=8D=AE=E6=BA=90=E5=A4=8D=E5=88=B6=E5=8A=9F?= =?UTF-8?q?=E8=83=BD=20https://github.com/datavane/tis/issues/303?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tis-datax/pom.xml | 1 + .../mongo/reader/ReaderFilterNormalQuery.java | 1 + tis-datax/tis-ds-mysql-mariadb-plugin/pom.xml | 58 ++++ .../ds/mysql/MariaDBDataSourceFactory.java | 116 ++++++++ .../ds/mysql/MariaDBDataSourceFactory.json | 5 + .../src/main/resources/description.md | 3 + .../src/test/java/TestAll.java | 36 +++ .../mysql/TestMariaDBDataSourceFactory.java | 112 ++++++++ .../ds/mysql/TestMariaDBDataSourceReader.java | 87 ++++++ .../tis/plugin/ds/mysql/base-cols-meta.json | 101 +++++++ .../ds/mysql/MySQLDataSourceFactory.java | 2 +- .../tis/plugin/datax/DataxMySQLReader.json | 5 +- .../CloneDefaultDataXProcessor.java | 16 +- .../ds/manipulate/ManipuldateUtils.java | 6 +- .../src/test/java/TestAll.java | 4 +- .../ds/manipulate/TestManipuldateUtils.java | 117 +++++++++ .../ds/manipulate/post-manipulate-body.json | 247 ++++++++++++++++++ 17 files changed, 908 insertions(+), 9 deletions(-) create mode 100644 tis-datax/tis-ds-mysql-mariadb-plugin/pom.xml create mode 100644 tis-datax/tis-ds-mysql-mariadb-plugin/src/main/java/com/qlangtech/tis/plugin/ds/mysql/MariaDBDataSourceFactory.java create mode 100644 tis-datax/tis-ds-mysql-mariadb-plugin/src/main/resources/com/qlangtech/tis/plugin/ds/mysql/MariaDBDataSourceFactory.json create mode 100644 tis-datax/tis-ds-mysql-mariadb-plugin/src/main/resources/description.md create mode 100644 tis-datax/tis-ds-mysql-mariadb-plugin/src/test/java/TestAll.java create mode 100644 tis-datax/tis-ds-mysql-mariadb-plugin/src/test/java/com/qlangtech/tis/plugin/ds/mysql/TestMariaDBDataSourceFactory.java create mode 100644 tis-datax/tis-ds-mysql-mariadb-plugin/src/test/java/com/qlangtech/tis/plugin/ds/mysql/TestMariaDBDataSourceReader.java create mode 100644 tis-datax/tis-ds-mysql-mariadb-plugin/src/test/resources/com/qlangtech/tis/plugin/ds/mysql/base-cols-meta.json create mode 100644 tis-split-table-strategy-plugin/src/test/java/com/qlangtech/tis/plugin/ds/manipulate/TestManipuldateUtils.java create mode 100644 tis-split-table-strategy-plugin/src/test/resources/com/qlangtech/tis/plugin/ds/manipulate/post-manipulate-body.json diff --git a/tis-datax/pom.xml b/tis-datax/pom.xml index 9b4d95419..6894cfe1a 100644 --- a/tis-datax/pom.xml +++ b/tis-datax/pom.xml @@ -67,6 +67,7 @@ tis-ds-mysql-plugin tis-ds-mysql-v5-plugin tis-ds-mysql-v8-plugin + tis-ds-mysql-mariadb-plugin tis-datax-local-executor tis-datax-hudi-dependency diff --git a/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/mongo/reader/ReaderFilterNormalQuery.java b/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/mongo/reader/ReaderFilterNormalQuery.java index 936bb81d5..e2a298790 100644 --- a/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/mongo/reader/ReaderFilterNormalQuery.java +++ b/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/mongo/reader/ReaderFilterNormalQuery.java @@ -27,6 +27,7 @@ public class ReaderFilterNormalQuery extends ReaderFilter { @Override public Document createFilter() { Document fitler = Document.parse(query); + logger.info("create MongoDB collection filter:{}", fitler.toJson()); return fitler; } diff --git a/tis-datax/tis-ds-mysql-mariadb-plugin/pom.xml b/tis-datax/tis-ds-mysql-mariadb-plugin/pom.xml new file mode 100644 index 000000000..411348474 --- /dev/null +++ b/tis-datax/tis-ds-mysql-mariadb-plugin/pom.xml @@ -0,0 +1,58 @@ + + + + + + com.qlangtech.tis.plugins + tis-datax + ${revision} + ../pom.xml + + 4.0.0 + tpi + + 3.4.1 + + + tis-ds-mysql-mariadb-plugin + + + com.qlangtech.tis.plugins + tis-ds-mysql-plugin + ${project.version} + + + + + org.mariadb.jdbc + mariadb-java-client + ${mariadb-java-client.version} + + + + + + + + + + + diff --git a/tis-datax/tis-ds-mysql-mariadb-plugin/src/main/java/com/qlangtech/tis/plugin/ds/mysql/MariaDBDataSourceFactory.java b/tis-datax/tis-ds-mysql-mariadb-plugin/src/main/java/com/qlangtech/tis/plugin/ds/mysql/MariaDBDataSourceFactory.java new file mode 100644 index 000000000..a8cb59cc4 --- /dev/null +++ b/tis-datax/tis-ds-mysql-mariadb-plugin/src/main/java/com/qlangtech/tis/plugin/ds/mysql/MariaDBDataSourceFactory.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.qlangtech.tis.plugin.ds.mysql; + +import com.google.common.collect.Lists; +import com.qlangtech.tis.annotation.Public; +import com.qlangtech.tis.extension.TISExtension; +import com.qlangtech.tis.manage.common.TisUTF8; +import com.qlangtech.tis.plugin.ds.DBConfig; +import org.apache.commons.lang3.StringUtils; + +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.List; + +/** + * @author: 百岁(baisui@qlangtech.com) + * @create: 2024-06-08 21:47 + **/ +@Public +public class MariaDBDataSourceFactory extends MySQLDataSourceFactory { + + protected static final String DS_TYPE_MARIA_DB = "MariaDB"; + + private transient org.mariadb.jdbc.Driver driver; + + @Override + public JDBCConnection getConnection(String jdbcUrl, boolean verify) throws SQLException { + if (driver == null) { + driver = new org.mariadb.jdbc.Driver(); + } + java.util.Properties info = new java.util.Properties(); + if (this.userName != null) { + info.put("user", this.userName); + } + if (this.password != null) { + info.put("password", this.password); + } + if (verify) { + info.put("connectTimeout", "3000"); + info.put("socketTimeout", "3000"); + info.put("autoReconnect", "false"); + } + + return new JDBCConnection(driver.connect(jdbcUrl, info), jdbcUrl); + } + + @Override + public String buidJdbcUrl(DBConfig db, String ip, String dbName) { + + // https://mariadb.com/kb/en/about-mariadb-connector-j/#java-compatibility +// StringBuffer jdbcUrl = new StringBuffer("jdbc:mariadb://" + ip + ":" + this.port + "/" + dbName + +// "?useUnicode=yes&useCursorFetch=true&useSsl=false&serverTimezone=" + URLEncoder.encode(DEFAULT_SERVER_TIME_ZONE.getId(), TisUTF8.getName())); + + StringBuffer jdbcUrl = new StringBuffer("jdbc:mariadb://" + ip + ":" + this.port + "/" + dbName); + if (this.useCompression != null) { + jdbcUrl.append("&useCompression=").append(this.useCompression); + } +// if (org.apache.commons.lang.StringUtils.isNotEmpty(this.encode)) { +// jdbcUrl.append("&characterEncoding=").append(this.encode); +// } + if (org.apache.commons.lang.StringUtils.isNotEmpty(this.extraParams)) { + jdbcUrl.append("&" + this.extraParams); + } + return jdbcUrl.toString(); + + } + + @Override + public void setReaderStatement(Statement stmt) throws SQLException { + com.mysql.jdbc.Statement statement = (com.mysql.jdbc.Statement) stmt; + statement.enableStreamingResults(); + //statement.setFetchSize(0); + } + + @TISExtension + public static class MariaDBDescriptor extends DefaultDescriptor { + @Override + protected String getDataSourceName() { + return DS_TYPE_MARIA_DB; + } + + @Override + public final EndType getEndType() { + return EndType.MariaDB; + } + + @Override + public List facadeSourceTypes() { + return Lists.newArrayList(DS_TYPE_MARIA_DB); + } + + @Override + protected boolean validateMySQLVer(String mysqlVer) { + return StringUtils.containsIgnoreCase(mysqlVer, DS_TYPE_MARIA_DB); + } + } +} diff --git a/tis-datax/tis-ds-mysql-mariadb-plugin/src/main/resources/com/qlangtech/tis/plugin/ds/mysql/MariaDBDataSourceFactory.json b/tis-datax/tis-ds-mysql-mariadb-plugin/src/main/resources/com/qlangtech/tis/plugin/ds/mysql/MariaDBDataSourceFactory.json new file mode 100644 index 000000000..95b85bae0 --- /dev/null +++ b/tis-datax/tis-ds-mysql-mariadb-plugin/src/main/resources/com/qlangtech/tis/plugin/ds/mysql/MariaDBDataSourceFactory.json @@ -0,0 +1,5 @@ +{ + "encode": { + "disable": true + } +} \ No newline at end of file diff --git a/tis-datax/tis-ds-mysql-mariadb-plugin/src/main/resources/description.md b/tis-datax/tis-ds-mysql-mariadb-plugin/src/main/resources/description.md new file mode 100644 index 000000000..bebb07112 --- /dev/null +++ b/tis-datax/tis-ds-mysql-mariadb-plugin/src/main/resources/description.md @@ -0,0 +1,3 @@ +* 封装`MariaDB`数据源驱动 + + 驱动版本为[MariaDB Connector/J is for Java 8:3.4.1](https://mariadb.com/downloads/connectors/connectors-data-access/java8-connector), 支持`MariaDB`数据源以JDBC的方式连接 diff --git a/tis-datax/tis-ds-mysql-mariadb-plugin/src/test/java/TestAll.java b/tis-datax/tis-ds-mysql-mariadb-plugin/src/test/java/TestAll.java new file mode 100644 index 000000000..5b3786be6 --- /dev/null +++ b/tis-datax/tis-ds-mysql-mariadb-plugin/src/test/java/TestAll.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import com.qlangtech.tis.plugin.ds.mysql.TestMariaDBDataSourceFactory; +import junit.framework.Test; +import junit.framework.TestCase; +import junit.framework.TestSuite; + +/** + * @author: baisui 百岁 + * @create: 2021-01-07 18:52 + **/ +public class TestAll extends TestCase { + + public static Test suite() { + TestSuite suite = new TestSuite(); + suite.addTestSuite(TestMariaDBDataSourceFactory.class); + + return suite; + } +} diff --git a/tis-datax/tis-ds-mysql-mariadb-plugin/src/test/java/com/qlangtech/tis/plugin/ds/mysql/TestMariaDBDataSourceFactory.java b/tis-datax/tis-ds-mysql-mariadb-plugin/src/test/java/com/qlangtech/tis/plugin/ds/mysql/TestMariaDBDataSourceFactory.java new file mode 100644 index 000000000..ecf63c898 --- /dev/null +++ b/tis-datax/tis-ds-mysql-mariadb-plugin/src/test/java/com/qlangtech/tis/plugin/ds/mysql/TestMariaDBDataSourceFactory.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.qlangtech.tis.plugin.ds.mysql; + +import com.qlangtech.tis.plugin.ds.ColumnMetaData; +import com.qlangtech.tis.sql.parser.tuple.creator.EntityName; +import com.qlangtech.tis.trigger.util.JsonUtil; +import junit.framework.TestCase; +import org.junit.Ignore; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.util.Collections; +import java.util.List; + +/** + * @author: 百岁(baisui@qlangtech.com) + * @create: 2021-12-30 10:56 + **/ +public class TestMariaDBDataSourceFactory extends TestCase { + public void testGetTableMetadata() { + MariaDBDataSourceFactory dataSourceFactory = new MariaDBDataSourceFactory(); + dataSourceFactory.useCompression = true; + dataSourceFactory.password = "123456"; + dataSourceFactory.dbName = "order2"; + dataSourceFactory.encode = "utf8"; + dataSourceFactory.port = 3306; + dataSourceFactory.userName = "root"; + dataSourceFactory.nodeDesc = "192.168.28.200"; + + + List baseColsMeta = dataSourceFactory.getTableMetadata(false, EntityName.parse("base")); + assertEquals(8, baseColsMeta.size()); + + + JsonUtil.assertJSONEqual(TestMariaDBDataSourceFactory.class, "base-cols-meta.json" + , JsonUtil.toString(Collections.singletonMap("cols", baseColsMeta)), (msg, e, a) -> { + assertEquals(msg, e, a); + }); + } + + + @Ignore + public void testAliyunAdsSource() { + MariaDBDataSourceFactory dataSourceFactory = new MariaDBDataSourceFactory(); + dataSourceFactory.useCompression = false; + dataSourceFactory.password = "SLK_20221218"; + dataSourceFactory.dbName = "local-life"; + dataSourceFactory.encode = "utf8"; + dataSourceFactory.port = 3306; + dataSourceFactory.userName = "slk"; + dataSourceFactory.nodeDesc = "am-2ev66ttmhd5ys6k3l167320o.ads.aliyuncs.com"; + + dataSourceFactory.visitFirstConnection((connection) -> { + try { +// connection.execute( +// "CREATE TABLE `cloudcanal_heartbeat_baisui` (\n" + +// " `id` varchar(32) NOT NULL COMMENT '主键ID',\n" + +// " `name` varchar(50) NOT NULL COMMENT '姓名',\n" + +// " `is_valid` int(1) NOT NULL DEFAULT '1' COMMENT '是否有效,1:有效 0 无效',\n" + +// " `create_time` bigint(20) NOT NULL COMMENT '创建时间',\n" + +// " `op_time` bigint(20) DEFAULT NULL COMMENT '操作时间',\n" + +// " `last_ver` int(11) NOT NULL DEFAULT '1' COMMENT '版本号',\n" + +// " `op_user_id` varchar(32) NOT NULL COMMENT '操作人',\n" + +// " `ext` varchar(1000) DEFAULT NULL,\n" + +// " PRIMARY KEY (`id`)\n" + +// ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4" +// ); + +// connection.execute( +// "INSERT INTO `cloudcanal_heartbeat_baisui`(`id`, `name`, `is_valid`, `create_time`, `op_time`, `last_ver`, `op_user_id`, `ext`)\n" + +// "VALUES ('3', 'cloudcanal_heartbeat', '1', 1690275535280, 1690791060013, '2', '', NULL)" +// ); + + Connection cnn = connection.getConnection(); + try (PreparedStatement prep = cnn.prepareStatement("insert into `cloudcanal_heartbeat_baisui`(`id`, `name`, `is_valid`, `create_time`, `op_time`, `last_ver`, `op_user_id`, `ext`)" + + " values (?,?,?,?,?,?,?,?)")) { + + prep.setString(1, "11"); + prep.setString(2, "baisui"); + prep.setInt(3, 1); + prep.setLong(4, System.currentTimeMillis()); + prep.setLong(5, System.currentTimeMillis()); + prep.setInt(6, 5); + prep.setString(7, "22"); + prep.setString(8, "xxx"); + + prep.execute(); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + } +} diff --git a/tis-datax/tis-ds-mysql-mariadb-plugin/src/test/java/com/qlangtech/tis/plugin/ds/mysql/TestMariaDBDataSourceReader.java b/tis-datax/tis-ds-mysql-mariadb-plugin/src/test/java/com/qlangtech/tis/plugin/ds/mysql/TestMariaDBDataSourceReader.java new file mode 100644 index 000000000..7bfc9826f --- /dev/null +++ b/tis-datax/tis-ds-mysql-mariadb-plugin/src/test/java/com/qlangtech/tis/plugin/ds/mysql/TestMariaDBDataSourceReader.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.qlangtech.tis.plugin.ds.mysql; + +import com.alibaba.datax.plugin.rdbms.util.DBUtil; +import com.qlangtech.tis.plugin.ds.DataSourceFactory; +import com.qlangtech.tis.plugin.ds.IDataSourceFactoryGetter; +import junit.framework.TestCase; +import org.apache.commons.lang3.tuple.Pair; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; + +/** + * @author: 百岁(baisui@qlangtech.com) + * @create: 2023-05-08 10:41 + **/ +public class TestMariaDBDataSourceReader extends TestCase { + public void testReader() { + MariaDBDataSourceFactory dataSourceFactory = new MariaDBDataSourceFactory(); + dataSourceFactory.useCompression = false; + dataSourceFactory.password = "123456"; + dataSourceFactory.dbName = "item_center"; + dataSourceFactory.encode = "utf8"; + dataSourceFactory.port = 3306; + dataSourceFactory.userName = "root"; + dataSourceFactory.nodeDesc = "192.168.28.200"; + + + IDataSourceFactoryGetter dsGetter = new IDataSourceFactoryGetter() { + @Override + public DataSourceFactory getDataSourceFactory() { + return dataSourceFactory; + } + + @Override + public Integer getRowFetchSize() { + return 2000; + } + }; + + + dataSourceFactory.visitFirstConnection((conn) -> { + Connection connection = conn.getConnection(); +//com.alibaba.datax.plugin.reader.mysqlreader.MysqlReader + String querySql = "select * from item"; + + Pair query = DBUtil.query(connection, querySql, 2000, dsGetter); + try { + int all = 0; + int count = 0; + try (ResultSet rs = query.getRight()) { + while (rs.next()) { + count++; + all++; + if (count > 50000) { + count = 0; + System.out.println(all); + } + } + } + } finally { + query.getLeft().close(); + } + + + }); + + } +} diff --git a/tis-datax/tis-ds-mysql-mariadb-plugin/src/test/resources/com/qlangtech/tis/plugin/ds/mysql/base-cols-meta.json b/tis-datax/tis-ds-mysql-mariadb-plugin/src/test/resources/com/qlangtech/tis/plugin/ds/mysql/base-cols-meta.json new file mode 100644 index 000000000..2e0502588 --- /dev/null +++ b/tis-datax/tis-ds-mysql-mariadb-plugin/src/test/resources/com/qlangtech/tis/plugin/ds/mysql/base-cols-meta.json @@ -0,0 +1,101 @@ +{ + "cols":[ + { + "index":0, + "key":"base_id", + "name":"base_id", + "pk":true, + "type":{ + "collapse":"Long", + "columnSize":10, + "type":4 + }, + "value":"base_id" + }, + { + "index":1, + "key":"start_time", + "name":"start_time", + "pk":false, + "type":{ + "collapse":"Date", + "columnSize":19, + "type":93 + }, + "value":"start_time" + }, + { + "index":2, + "key":"update_date", + "name":"update_date", + "pk":false, + "type":{ + "collapse":"Date", + "columnSize":10, + "type":91 + }, + "value":"update_date" + }, + { + "index":3, + "key":"update_time", + "name":"update_time", + "pk":false, + "type":{ + "collapse":"Date", + "columnSize":19, + "type":93 + }, + "value":"update_time" + }, + { + "index":4, + "key":"price", + "name":"price", + "pk":false, + "type":{ + "collapse":"Double", + "columnSize":5, + "decimalDigits":2, + "type":3 + }, + "value":"price" + }, + { + "index":5, + "key":"json_content", + "name":"json_content", + "pk":false, + "type":{ + "collapse":"STRING", + "columnSize":2000, + "type":12 + }, + "value":"json_content" + }, + { + "index":6, + "key":"col_blob", + "name":"col_blob", + "pk":false, + "type":{ + "collapse":"Bytes", + "columnSize":65535, + "type":-4 + }, + "value":"col_blob" + }, + { + "index":7, + "key":"col_text", + "name":"col_text", + "pk":false, + "type":{ + "collapse":"STRING", + "columnSize":65535, + "type":-1 + }, + "value":"col_text" + } + ] +} diff --git a/tis-datax/tis-ds-mysql-plugin/src/main/java/com/qlangtech/tis/plugin/ds/mysql/MySQLDataSourceFactory.java b/tis-datax/tis-ds-mysql-plugin/src/main/java/com/qlangtech/tis/plugin/ds/mysql/MySQLDataSourceFactory.java index a4f19e578..86c3f1929 100644 --- a/tis-datax/tis-ds-mysql-plugin/src/main/java/com/qlangtech/tis/plugin/ds/mysql/MySQLDataSourceFactory.java +++ b/tis-datax/tis-ds-mysql-plugin/src/main/java/com/qlangtech/tis/plugin/ds/mysql/MySQLDataSourceFactory.java @@ -501,7 +501,7 @@ public Optional getDefaultDataXReaderDescName() { } @Override - public final EndType getEndType() { + public EndType getEndType() { return EndType.MySQL; } diff --git a/tis-datax/tis-ds-mysql-plugin/src/main/resources/com/qlangtech/tis/plugin/datax/DataxMySQLReader.json b/tis-datax/tis-ds-mysql-plugin/src/main/resources/com/qlangtech/tis/plugin/datax/DataxMySQLReader.json index b65e3dfc8..131899769 100644 --- a/tis-datax/tis-ds-mysql-plugin/src/main/resources/com/qlangtech/tis/plugin/datax/DataxMySQLReader.json +++ b/tis-datax/tis-ds-mysql-plugin/src/main/resources/com/qlangtech/tis/plugin/datax/DataxMySQLReader.json @@ -1,6 +1,6 @@ { "dbName": { - "enum": "com.qlangtech.tis.util.PluginItems.getExistDbs(\"MySQL-V5\",\"MySQL-V8\")", + "enum": "com.qlangtech.tis.util.PluginItems.getExistDbs(\"MySQL-V5\",\"MySQL-V8\",\"MariaDB\")", "creator": { "plugin": [ { @@ -8,6 +8,9 @@ }, { "descName": "MySQL-V8" + }, + { + "descName": "MariaDB" } ] } diff --git a/tis-split-table-strategy-plugin/src/main/java/com/qlangtech/tis/plugin/ds/manipulate/CloneDefaultDataXProcessor.java b/tis-split-table-strategy-plugin/src/main/java/com/qlangtech/tis/plugin/ds/manipulate/CloneDefaultDataXProcessor.java index 827f5ef45..8d3d8faaa 100644 --- a/tis-split-table-strategy-plugin/src/main/java/com/qlangtech/tis/plugin/ds/manipulate/CloneDefaultDataXProcessor.java +++ b/tis-split-table-strategy-plugin/src/main/java/com/qlangtech/tis/plugin/ds/manipulate/CloneDefaultDataXProcessor.java @@ -20,6 +20,8 @@ import com.alibaba.citrus.turbine.Context; import com.qlangtech.tis.datax.DefaultDataXProcessorManipulate; +import com.qlangtech.tis.datax.IDataxProcessor; +import com.qlangtech.tis.datax.impl.DataxProcessor; import com.qlangtech.tis.extension.TISExtension; import com.qlangtech.tis.manage.IAppSource; import com.qlangtech.tis.plugin.IPluginStore.AfterPluginSaved; @@ -35,6 +37,7 @@ import org.apache.commons.lang3.StringUtils; import java.util.List; +import java.util.Objects; import java.util.Optional; /** @@ -71,11 +74,14 @@ public void afterSaved(IPluginContext pluginContext, Optional context) /** * 先拷贝所有文件,与下面一步执行前后顺序不能颠倒 */ - IPluginWithStore storePlugins = itemsProcessor.getStorePlugins(); - List pipelines = storePlugins.listPlugins(); - for (IAppSource pipeline : pipelines) { - pipeline.copy(this.name); - } + //IPluginWithStore storePlugins = itemsProcessor.getStorePlugins(); + DataxProcessor copyFromPipeline = (DataxProcessor) DataxProcessor.load(null, originId[0]); + Objects.requireNonNull(copyFromPipeline, "name:" + originId[0] + " relevant pipeline can not be null"); + copyFromPipeline.copy(this.name); +// List pipelines = storePlugins.listPlugins(); +// for (IAppSource pipeline : pipelines) { +// pipeline.copy(this.name); +// } /** * 再将新的带有替换后的identityName名的实例保存 diff --git a/tis-split-table-strategy-plugin/src/main/java/com/qlangtech/tis/plugin/ds/manipulate/ManipuldateUtils.java b/tis-split-table-strategy-plugin/src/main/java/com/qlangtech/tis/plugin/ds/manipulate/ManipuldateUtils.java index 4082ee05b..0900f74a7 100644 --- a/tis-split-table-strategy-plugin/src/main/java/com/qlangtech/tis/plugin/ds/manipulate/ManipuldateUtils.java +++ b/tis-split-table-strategy-plugin/src/main/java/com/qlangtech/tis/plugin/ds/manipulate/ManipuldateUtils.java @@ -39,12 +39,15 @@ * @create: 2024-07-10 23:16 **/ public class ManipuldateUtils { + + + public static IPluginItemsProcessor cloneInstance(IPluginContext pluginContext, Context context, String newIdentityName , Consumer pluginMetaConsumer , Consumer originIdentityIdConsumer) { Objects.requireNonNull(context, "param content can not be null"); JSONObject postContent = Objects.requireNonNull(pluginContext, "pluginContext can not be null").getJSONPostContent(); - JSONObject manipulateTarget = postContent.getJSONObject("manipulateTarget"); + JSONObject manipulateTarget = postContent.getJSONObject(IUploadPluginMeta.KEY_JSON_MANIPULATE_TARGET); final String keyManipulatePluginMeta = "manipulatePluginMeta"; String pluginType = postContent.getString(keyManipulatePluginMeta); if (StringUtils.isEmpty(pluginType)) { @@ -58,6 +61,7 @@ public static IPluginItemsProcessor cloneInstance(IPluginContext pluginContext, throw new IllegalStateException("pluginMeta can not be empty"); } for (IUploadPluginMeta meta : pluginMeta) { + meta.putExtraParams(DBIdentity.KEY_UPDATE, Boolean.FALSE.toString()); pluginMetaConsumer.accept(meta); JSONArray itemsArray = new JSONArray(); diff --git a/tis-split-table-strategy-plugin/src/test/java/TestAll.java b/tis-split-table-strategy-plugin/src/test/java/TestAll.java index f81bfae37..3bf95f392 100644 --- a/tis-split-table-strategy-plugin/src/test/java/TestAll.java +++ b/tis-split-table-strategy-plugin/src/test/java/TestAll.java @@ -16,6 +16,7 @@ * limitations under the License. */ +import com.qlangtech.tis.plugin.ds.manipulate.TestManipuldateUtils; import com.qlangtech.tis.plugin.ds.split.TestDefaultSplitTableStrategy; import org.junit.runner.RunWith; import org.junit.runners.Suite; @@ -26,7 +27,8 @@ **/ @RunWith(Suite.class) @Suite.SuiteClasses({ - TestDefaultSplitTableStrategy.class}) + TestDefaultSplitTableStrategy.class + , TestManipuldateUtils.class}) public class TestAll //extends TestCase { diff --git a/tis-split-table-strategy-plugin/src/test/java/com/qlangtech/tis/plugin/ds/manipulate/TestManipuldateUtils.java b/tis-split-table-strategy-plugin/src/test/java/com/qlangtech/tis/plugin/ds/manipulate/TestManipuldateUtils.java new file mode 100644 index 000000000..e6f05dc39 --- /dev/null +++ b/tis-split-table-strategy-plugin/src/test/java/com/qlangtech/tis/plugin/ds/manipulate/TestManipuldateUtils.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.qlangtech.tis.plugin.ds.manipulate; + +import com.alibaba.citrus.turbine.Context; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import com.google.common.collect.Lists; +import com.qlangtech.tis.extension.impl.PropValRewrite; +import com.qlangtech.tis.runtime.module.misc.IPostContent; +import com.qlangtech.tis.test.TISEasyMock; +import com.qlangtech.tis.trigger.util.JsonUtil; +import com.qlangtech.tis.util.IPluginContext; +import com.qlangtech.tis.util.IPluginItemsProcessor; +import com.qlangtech.tis.util.IUploadPluginMeta; +import com.qlangtech.tis.util.UploadPluginMeta; +import org.apache.commons.lang3.tuple.Pair; +import org.easymock.EasyMock; +import org.easymock.IExpectationSetters; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; +import java.util.List; +import java.util.function.Consumer; + +/** + * @author: 百岁(baisui@qlangtech.com) + * @create: 2024-07-20 08:28 + **/ +public class TestManipuldateUtils implements TISEasyMock { + + @Test + public void testCloneInstance() { + + IPluginContext pluginContext = mock("pluginContext", IPluginContext.class); + IPluginItemsProcessor itemProcessor = mock("itemProcessor", IPluginItemsProcessor.class); + JSONObject postJson = JsonUtil.loadJSON(ManipuldateUtils.class, "post-manipulate-body.json"); + EasyMock.expect(pluginContext.getJSONPostContent()).andReturn(postJson); + + String meta = "appSource:require,update_true,justGetItemRelevant_true,dataxName_mysql_mysql,processModel_createDatax"; + List pluginMetas = (UploadPluginMeta.parse(pluginContext, new String[]{meta}, false)); + EasyMock.expect(pluginContext.parsePluginMeta(new String[]{meta}, false)) + .andReturn(pluginMetas); + + + Context context = mock("context", Context.class); + EasyMock.expect(context.hasErrors()).andReturn(true); + + for (IUploadPluginMeta m : pluginMetas) { + + JSONArray itemsArray = new JSONArray(); + itemsArray.add(postJson.getJSONObject(IUploadPluginMeta.KEY_JSON_MANIPULATE_TARGET)); + + pluginContext.getPluginItems(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.eq(0), EasyMock.anyObject(), EasyMock.eq(false), EasyMock.anyObject()); + IExpectationSetters> getPluginItemsSetters = EasyMock.expectLastCall(); + getPluginItemsSetters.andStubDelegateTo(new DelegatePostContent(itemProcessor)); + + } + + String newIdentityName = "test"; + Consumer pluginMetaConsumer = (mt) -> { + }; + Consumer originIdentityIdConsumer = (id) -> { + }; + + replay(); + IPluginItemsProcessor itemsProcessor + = ManipuldateUtils.cloneInstance(pluginContext, context, newIdentityName, pluginMetaConsumer, originIdentityIdConsumer); + Assert.assertNull("because newIdentityName is duplicate, result itemsProcessor shall be null", itemsProcessor); + + verifyAll(); + } + + private class DelegatePostContent implements IPostContent { + private final IPluginItemsProcessor itemsProcessor; + + public DelegatePostContent(IPluginItemsProcessor itemsProcessor) { + this.itemsProcessor = itemsProcessor; + } + + @Override + public Pair getPluginItems( + IUploadPluginMeta pluginMeta, Context context, int pluginIndex, JSONArray itemsArray, boolean verify, PropValRewrite propValRewrite) { + UploadPluginMeta meta = (UploadPluginMeta) pluginMeta; + // 要保证是insert 操作 + Assert.assertFalse("shall not be update", meta.isUpdate()); + return Pair.of(false, itemsProcessor); + } + + @Override + public List parsePluginMeta(String[] plugins, boolean useCache) { + throw new UnsupportedOperationException(); + } + + @Override + public JSONObject getJSONPostContent() { + throw new UnsupportedOperationException(); + } + } +} diff --git a/tis-split-table-strategy-plugin/src/test/resources/com/qlangtech/tis/plugin/ds/manipulate/post-manipulate-body.json b/tis-split-table-strategy-plugin/src/test/resources/com/qlangtech/tis/plugin/ds/manipulate/post-manipulate-body.json new file mode 100644 index 000000000..529cd414c --- /dev/null +++ b/tis-split-table-strategy-plugin/src/test/resources/com/qlangtech/tis/plugin/ds/manipulate/post-manipulate-body.json @@ -0,0 +1,247 @@ +{ + "items": [ + [{ + "updateModel": false, + "impl": "com.qlangtech.tis.plugin.ds.manipulate.CloneDefaultDataXProcessor", + "vals": { + "name": { + "updateModel": false, + "_primaryVal": "test", + "has_set_primaryVal": false, + "disabled": false, + "key": "name", + "pk": true, + "_eprops": { + "help": "填写新实例名称,不能与已存在的数据管道实例重名", + "label": "新实例ID" + }, + "placeholder": "", + "dateTimeFormat": "yyyy-MM-dd HH:mm:ss", + "required": true, + "type": 1 + } + }, + "displayName": "Clone", + "showAllField": false, + "dspt": { + "impl": "com.qlangtech.tis.plugin.ds.manipulate.CloneDefaultDataXProcessor", + "pkField": "name", + "implUrl": "http://tis.pub/docs/plugin/plugins/#comqlangtechtisplugindsmanipulateclonedefaultdataxprocessor", + "displayName": "Clone", + "extendPoint": "com.qlangtech.tis.datax.DefaultDataXProcessorManipulate", + "containAdvance": false, + "veriflable": false, + "extractProps": { + "notebook": { + "activate": false, + "ability": false + } + }, + "attrs": [{ + "ord": 0, + "eprops": { + "help": "填写新实例名称,不能与已存在的数据管道实例重名", + "label": "新实例ID" + }, + "describable": false, + "pk": true, + "type": 1, + "key": "name", + "required": true + }] + }, + "implUrl": "http://tis.pub/docs/plugin/plugins/#comqlangtechtisplugindsmanipulateclonedefaultdataxprocessor", + "_propVals": [{ + "updateModel": false, + "_primaryVal": "test", + "has_set_primaryVal": false, + "disabled": false, + "key": "name", + "pk": true, + "_eprops": { + "help": "填写新实例名称,不能与已存在的数据管道实例重名", + "label": "新实例ID" + }, + "placeholder": "", + "dateTimeFormat": "yyyy-MM-dd HH:mm:ss", + "required": true, + "type": 1 + }] + }] + ], + "manipulateTarget": { + "updateModel": false, + "impl": "com.qlangtech.tis.plugin.datax.DefaultDataxProcessor", + "vals": { + "name": { + "updateModel": true, + "_primaryVal": "mysql_mysql", + "has_set_primaryVal": false, + "disabled": false, + "key": "name", + "pk": true, + "_eprops": { + "label": "实例名称", + "placeholder": "MySQL-import" + }, + "placeholder": "MySQL-import", + "dateTimeFormat": "yyyy-MM-dd HH:mm:ss", + "required": true, + "type": 1 + }, + "globalCfg": { + "updateModel": true, + "_primaryVal": "datax-global-config", + "has_set_primaryVal": false, + "disabled": false, + "key": "globalCfg", + "pk": false, + "_eprops": { + "creator": { + "plugin": [{ + "hetero": "params-cfg", + "descName": "DataX-global" + }], + "label": "配置" + }, + "dftVal": "datax-global-config", + "label": "全局配置" + }, + "dftVal": "datax-global-config", + "placeholder": "", + "dateTimeFormat": "yyyy-MM-dd HH:mm:ss", + "required": true, + "type": 6, + "options": [{ + "impl": "com.qlangtech.tis.plugin.datax.DataXGlobalConfig", + "name": "datax-global-config" + }] + }, + "dptId": { + "updateModel": true, + "_primaryVal": "2", + "has_set_primaryVal": false, + "disabled": false, + "key": "dptId", + "pk": false, + "_eprops": { + "creator": { + "routerLink": "/base/departmentlist", + "label": "部门管理" + }, + "label": "所属部门", + "enum": [{ + "val": "2", + "label": "/tis/default" + }] + }, + "placeholder": "", + "dateTimeFormat": "yyyy-MM-dd HH:mm:ss", + "required": true, + "type": 5 + }, + "recept": { + "updateModel": true, + "_primaryVal": "小明", + "has_set_primaryVal": false, + "disabled": false, + "key": "recept", + "pk": false, + "_eprops": { + "label": "接口人", + "placeholder": "小明" + }, + "placeholder": "小明", + "dateTimeFormat": "yyyy-MM-dd HH:mm:ss", + "required": true, + "type": 1 + } + }, + "displayName": "DataxProcessor", + "showAllField": false, + "dspt": { + "impl": "com.qlangtech.tis.plugin.datax.DefaultDataxProcessor", + "pkField": "name", + "implUrl": "http://tis.pub/docs/plugin/plugins/#comqlangtechtisplugindataxdefaultdataxprocessor", + "displayName": "DataxProcessor", + "extendPoint": "com.qlangtech.tis.manage.IAppSource", + "containAdvance": false, + "veriflable": false, + "extractProps": { + "manipulate": { + "extendPoint": "com.qlangtech.tis.datax.DefaultDataXProcessorManipulate" + }, + "notebook": { + "activate": false, + "ability": false + } + }, + "attrs": [{ + "ord": 0, + "eprops": { + "label": "实例名称", + "placeholder": "MySQL-import" + }, + "describable": false, + "pk": true, + "type": 1, + "key": "name", + "required": true + }, { + "ord": 1, + "eprops": { + "creator": { + "plugin": [{ + "hetero": "params-cfg", + "descName": "DataX-global" + }], + "label": "配置" + }, + "dftVal": "datax-global-config", + "label": "全局配置" + }, + "describable": false, + "options": [{ + "impl": "com.qlangtech.tis.plugin.datax.DataXGlobalConfig", + "name": "datax-global-config" + }], + "pk": false, + "type": 6, + "key": "globalCfg", + "required": true + }, { + "ord": 2, + "eprops": { + "creator": { + "routerLink": "/base/departmentlist", + "label": "部门管理" + }, + "label": "所属部门", + "enum": [{ + "val": "2", + "label": "/tis/default" + }] + }, + "describable": false, + "pk": false, + "type": 5, + "key": "dptId", + "required": true + }, { + "ord": 3, + "eprops": { + "label": "接口人", + "placeholder": "小明" + }, + "describable": false, + "pk": false, + "type": 1, + "key": "recept", + "required": true + }] + }, + "identityName": "mysql_mysql", + "implUrl": "http://tis.pub/docs/plugin/plugins/#comqlangtechtisplugindataxdefaultdataxprocessor" + }, + "manipulatePluginMeta": "appSource:require,update_true,justGetItemRelevant_true,dataxName_mysql_mysql,processModel_createDatax" +} \ No newline at end of file