From bacf0d4e5d140f80f5d01a48b94b7f231ede2f98 Mon Sep 17 00:00:00 2001 From: mozhenghua Date: Tue, 5 Nov 2024 18:02:04 +0800 Subject: [PATCH] =?UTF-8?q?=20=E4=BD=BF=E7=94=A8JSON=20Splitter=E6=8B=86?= =?UTF-8?q?=E5=88=86=E8=AE=B0=E5=BD=95=E4=B8=AD=E7=9A=84JSON=E5=AD=97?= =?UTF-8?q?=E6=AE=B5=E5=86=85=E5=AE=B9=E5=87=BA=E9=94=99,=20modify=20for?= =?UTF-8?q?=20https://github.com/datavane/tis/issues/390?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dependency-reduced-pom.xml | 8 +- .../dependency-reduced-pom.xml | 8 +- .../plugin/datax/CassandraWriterContext.java | 2 +- .../plugin/datax/DataXDFSWriterContext.java | 2 +- .../plugin/datax/MongoDBWriterContext.java | 2 +- .../TestSqlServer2019DatasourceFactory.java | 54 +++ .../dependency-reduced-pom.xml | 6 +- .../dependency-reduced-pom.xml | 6 +- .../sink/TestChunjunDamengSinkFactory.java | 16 +- .../sink/TestChunjunDorisSinkFactory.java | 86 ++--- .../sink/TestChunjunOracleSinkFactory.java | 8 +- .../TestChunjunPostgreSQLSinkFactory.java | 3 +- ...unjunPostgreSQLSinkFactoryByFullTypes.java | 6 +- .../TestChunjunStarRocksSinkFactory.java | 77 +++-- .../SelectedTableTransformerRules.java | 5 +- .../doris/sink/TestFlinkSinkExecutor.java | 320 ++++++++++++++---- ...TestFlinkSinkExecutorByMySQLFullTypes.java | 7 +- .../TableRegisterFlinkSourceHandle.java | 3 +- .../dependency-reduced-pom.xml | 6 +- .../datax/transformer/impl/ConcatUDF.java | 2 +- .../datax/transformer/impl/CopyValUDF.java | 2 +- .../transformer/impl/JSONSplitterUDF.java | 2 +- 22 files changed, 450 insertions(+), 181 deletions(-) create mode 100644 tis-datax/tis-datax-sqlserver-v2019-plugin/src/test/java/com/qlangtech/tis/plugin/ds/sqlserver/TestSqlServer2019DatasourceFactory.java diff --git a/tis-datax/executor/dolphinscheduler-task-tis-datasync/dependency-reduced-pom.xml b/tis-datax/executor/dolphinscheduler-task-tis-datasync/dependency-reduced-pom.xml index 0ee181a6d..b12be173c 100644 --- a/tis-datax/executor/dolphinscheduler-task-tis-datasync/dependency-reduced-pom.xml +++ b/tis-datax/executor/dolphinscheduler-task-tis-datasync/dependency-reduced-pom.xml @@ -3,13 +3,13 @@ tis-datax com.qlangtech.tis.plugins - 4.1.0 + 4.1.0-SNAPSHOT ../../pom.xml 4.0.0 com.qlangtech.tis.plugins dolphinscheduler-task-tis-datasync - 4.1.0 + 4.1.0-SNAPSHOT GNU Affero General Public License @@ -68,7 +68,7 @@ com.qlangtech.tis tis-base-test - 4.1.0 + 4.1.0-SNAPSHOT test @@ -86,7 +86,7 @@ com.alibaba.datax datax-common - 4.1.0 + 4.1.0-SNAPSHOT provided diff --git a/tis-datax/tis-aliyun-jindo-sdk-extends/tis-aliyun-jindo-sdk-extends-impl/dependency-reduced-pom.xml b/tis-datax/tis-aliyun-jindo-sdk-extends/tis-aliyun-jindo-sdk-extends-impl/dependency-reduced-pom.xml index d66ac7b3c..582392f42 100644 --- a/tis-datax/tis-aliyun-jindo-sdk-extends/tis-aliyun-jindo-sdk-extends-impl/dependency-reduced-pom.xml +++ b/tis-datax/tis-aliyun-jindo-sdk-extends/tis-aliyun-jindo-sdk-extends-impl/dependency-reduced-pom.xml @@ -3,12 +3,12 @@ tis-aliyun-jindo-sdk-extends com.qlangtech.tis.plugins - 4.1.0 + 4.1.0-SNAPSHOT 4.0.0 com.qlangtech.tis.plugins tis-aliyun-jindo-sdk-extends-impl - 4.1.0 + 4.1.0-SNAPSHOT GNU Affero General Public License @@ -46,7 +46,7 @@ com.alibaba.datax datax-common - 4.1.0 + 4.1.0-SNAPSHOT provided @@ -64,7 +64,7 @@ com.qlangtech.tis tis-plugin - 4.1.0 + 4.1.0-SNAPSHOT provided diff --git a/tis-datax/tis-datax-cassandra-plugin/src/main/java/com/qlangtech/tis/plugin/datax/CassandraWriterContext.java b/tis-datax/tis-datax-cassandra-plugin/src/main/java/com/qlangtech/tis/plugin/datax/CassandraWriterContext.java index 1c732dee4..6d47be9b2 100644 --- a/tis-datax/tis-datax-cassandra-plugin/src/main/java/com/qlangtech/tis/plugin/datax/CassandraWriterContext.java +++ b/tis-datax/tis-datax-cassandra-plugin/src/main/java/com/qlangtech/tis/plugin/datax/CassandraWriterContext.java @@ -45,7 +45,7 @@ public CassandraWriterContext(DataXCassandraWriter writer, IDataxProcessor.Table this.tabMapper = tabMapper; this.dsFactory = writer.getDataSourceFactory(); - this.cols = transformerRules.map((rule) -> rule.overwriteCols(tabMapper.getSourceCols()).getCols()) + this.cols = transformerRules.map((rule) -> rule.overwriteCols(tabMapper.getSourceCols()).getColsWithoutVirtualInfo()) .orElseGet(() -> tabMapper.getSourceCols().stream().collect(Collectors.toList())); } diff --git a/tis-datax/tis-datax-dfs-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXDFSWriterContext.java b/tis-datax/tis-datax-dfs-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXDFSWriterContext.java index 3c299b418..1aa3640f5 100644 --- a/tis-datax/tis-datax-dfs-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXDFSWriterContext.java +++ b/tis-datax/tis-datax-dfs-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXDFSWriterContext.java @@ -46,7 +46,7 @@ public DataXDFSWriterContext(DataXDFSWriter writer, IDataxProcessor.TableMap tab Objects.requireNonNull(writer.dfsLinker, "prop linker can not be null"); this.tableMapper = tableMapper; - this.cols = transformerRules.map((rule) -> rule.overwriteCols(tableMapper.getSourceCols()).getCols()) + this.cols = transformerRules.map((rule) -> rule.overwriteCols(tableMapper.getSourceCols()).getColsWithoutVirtualInfo()) .orElseGet(() -> tableMapper.getSourceCols().stream().collect(Collectors.toList())); // this.ftpServer = FTPServer.getServer(writer.linker); diff --git a/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/MongoDBWriterContext.java b/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/MongoDBWriterContext.java index 1541330ca..512c70ebb 100644 --- a/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/MongoDBWriterContext.java +++ b/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/MongoDBWriterContext.java @@ -96,7 +96,7 @@ private static String getDftColumn(ISelectedTab tab, Optional cols = null; if (transformerRules.isPresent()) { - cols = transformerRules.get().overwriteCols(tab.getCols()).getCols(); + cols = transformerRules.get().overwriteCols(tab.getCols()).getColsWithoutVirtualInfo(); } else { cols = tab.getCols().stream().collect(Collectors.toList()); } diff --git a/tis-datax/tis-datax-sqlserver-v2019-plugin/src/test/java/com/qlangtech/tis/plugin/ds/sqlserver/TestSqlServer2019DatasourceFactory.java b/tis-datax/tis-datax-sqlserver-v2019-plugin/src/test/java/com/qlangtech/tis/plugin/ds/sqlserver/TestSqlServer2019DatasourceFactory.java new file mode 100644 index 000000000..3df2e52f7 --- /dev/null +++ b/tis-datax/tis-datax-sqlserver-v2019-plugin/src/test/java/com/qlangtech/tis/plugin/ds/sqlserver/TestSqlServer2019DatasourceFactory.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.qlangtech.tis.plugin.ds.sqlserver; + +import org.junit.Test; + +import java.sql.Date; +import java.sql.PreparedStatement; +import java.sql.SQLType; +import java.sql.Types; + +/** + * @author: 百岁(baisui@qlangtech.com) + * @create: 2024-11-02 15:05 + **/ +public class TestSqlServer2019DatasourceFactory { + @Test + public void testConnect() { + SqlServer2019DatasourceFactory sqlServer2019DatasourceFactory = new SqlServer2019DatasourceFactory(); + sqlServer2019DatasourceFactory.name = "test"; + sqlServer2019DatasourceFactory.dbName = "tis"; + sqlServer2019DatasourceFactory.useSSL = false; + sqlServer2019DatasourceFactory.nodeDesc = "192.168.28.201"; + sqlServer2019DatasourceFactory.port = 1433; + sqlServer2019DatasourceFactory.userName = "sa"; + sqlServer2019DatasourceFactory.password = "Hello1234!"; + + sqlServer2019DatasourceFactory.visitFirstConnection((conn) -> { + try (PreparedStatement preparedStatement = conn.getConnection().prepareStatement("insert into base(base_id,update_date,col_blob) values( ?, ?, ? )")) { + preparedStatement.setInt(1, 24); + preparedStatement.setTimestamp(2, new java.sql.Timestamp(System.currentTimeMillis())); + preparedStatement.setObject(3, null, Types.NULL); + preparedStatement.execute(); + } + }); + + } +} diff --git a/tis-incr/tis-flink-cdc-mysql-shade-4-debezium-connector-mysql/dependency-reduced-pom.xml b/tis-incr/tis-flink-cdc-mysql-shade-4-debezium-connector-mysql/dependency-reduced-pom.xml index be8e04448..816e48d84 100644 --- a/tis-incr/tis-flink-cdc-mysql-shade-4-debezium-connector-mysql/dependency-reduced-pom.xml +++ b/tis-incr/tis-flink-cdc-mysql-shade-4-debezium-connector-mysql/dependency-reduced-pom.xml @@ -3,12 +3,12 @@ tis-incr com.qlangtech.tis.plugins - 4.1.0 + 4.1.0-SNAPSHOT 4.0.0 com.qlangtech.tis.plugins tis-flink-cdc-shade-4-debezium-connector-mysql - 4.1.0 + 4.1.0-SNAPSHOT GNU Affero General Public License @@ -66,7 +66,7 @@ com.qlangtech.tis tis-plugin - 4.1.0 + 4.1.0-SNAPSHOT provided diff --git a/tis-incr/tis-flink-cdc-postgresql-shade-4-debezium-connector-postgresql/dependency-reduced-pom.xml b/tis-incr/tis-flink-cdc-postgresql-shade-4-debezium-connector-postgresql/dependency-reduced-pom.xml index 4213cbd59..bc703de98 100644 --- a/tis-incr/tis-flink-cdc-postgresql-shade-4-debezium-connector-postgresql/dependency-reduced-pom.xml +++ b/tis-incr/tis-flink-cdc-postgresql-shade-4-debezium-connector-postgresql/dependency-reduced-pom.xml @@ -3,12 +3,12 @@ tis-incr com.qlangtech.tis.plugins - 4.1.0 + 4.1.0-SNAPSHOT 4.0.0 com.qlangtech.tis.plugins tis-flink-cdc-postgresql-shade-4-debezium-connector-postgresql - 4.1.0 + 4.1.0-SNAPSHOT GNU Affero General Public License @@ -68,7 +68,7 @@ com.qlangtech.tis tis-plugin - 4.1.0 + 4.1.0-SNAPSHOT provided diff --git a/tis-incr/tis-flink-chunjun-dameng-plugin/src/test/java/com/qlangtech/plugins/incr/flink/chunjun/dameng/sink/TestChunjunDamengSinkFactory.java b/tis-incr/tis-flink-chunjun-dameng-plugin/src/test/java/com/qlangtech/plugins/incr/flink/chunjun/dameng/sink/TestChunjunDamengSinkFactory.java index 9c9868366..ac5f0e8d4 100644 --- a/tis-incr/tis-flink-chunjun-dameng-plugin/src/test/java/com/qlangtech/plugins/incr/flink/chunjun/dameng/sink/TestChunjunDamengSinkFactory.java +++ b/tis-incr/tis-flink-chunjun-dameng-plugin/src/test/java/com/qlangtech/plugins/incr/flink/chunjun/dameng/sink/TestChunjunDamengSinkFactory.java @@ -24,6 +24,7 @@ import com.qlangtech.tis.plugin.datax.dameng.ds.DaMengDataSourceFactory; import com.qlangtech.tis.plugin.datax.dameng.writer.DataXDaMengWriter; import com.qlangtech.tis.plugin.ds.BasicDataSourceFactory; +import com.qlangtech.tis.plugin.ds.CMeta; import com.qlangtech.tis.plugin.ds.oracle.DamengDSFactoryContainer; import com.qlangtech.tis.plugin.ds.oracle.TISDamengContainer; import com.qlangtech.tis.plugins.incr.flink.connector.ChunjunSinkFactory; @@ -34,6 +35,7 @@ import org.junit.Test; import java.util.ArrayList; +import java.util.List; /** * @author: 百岁(baisui@qlangtech.com) @@ -67,17 +69,18 @@ public static void stop() { @Override protected UpdateMode createIncrMode() { - // InsertType insertType = new InsertType(); + // InsertType insertType = new InsertType(); UpsertType updateMode = new UpsertType(); - // UpdateType updateMode = new UpdateType(); + // UpdateType updateMode = new UpdateType(); // updateMode.updateKey = Lists.newArrayList(colId, updateTime); return updateMode; } - @Override - protected ArrayList getUniqueKey() { - return Lists.newArrayList(colId, updateTime); - } +// @Override +// protected ArrayList getUniqueKey(List metaCols) { +// // return Lists.newArrayList(colId, updateTime); +// return super.getUniqueKey(metaCols); +// } @Override protected BasicDataSourceFactory getDsFactory() { @@ -87,6 +90,7 @@ protected BasicDataSourceFactory getDsFactory() { @Test @Override public void testSinkSync() throws Exception { + super.testSinkSync(); } diff --git a/tis-incr/tis-flink-chunjun-doris-plugin/src/test/java/com/qlangtech/plugins/incr/flink/chunjun/doris/sink/TestChunjunDorisSinkFactory.java b/tis-incr/tis-flink-chunjun-doris-plugin/src/test/java/com/qlangtech/plugins/incr/flink/chunjun/doris/sink/TestChunjunDorisSinkFactory.java index 026319f1f..91f05eff6 100644 --- a/tis-incr/tis-flink-chunjun-doris-plugin/src/test/java/com/qlangtech/plugins/incr/flink/chunjun/doris/sink/TestChunjunDorisSinkFactory.java +++ b/tis-incr/tis-flink-chunjun-doris-plugin/src/test/java/com/qlangtech/plugins/incr/flink/chunjun/doris/sink/TestChunjunDorisSinkFactory.java @@ -28,6 +28,7 @@ import com.qlangtech.tis.coredefine.module.action.TargetResName; import com.qlangtech.tis.datax.IStreamTableMeataCreator; import com.qlangtech.tis.datax.IStreamTableMeta; +import com.qlangtech.tis.datax.impl.DataxProcessor; import com.qlangtech.tis.plugin.IEndTypeGetter; import com.qlangtech.tis.plugin.common.PluginDesc; import com.qlangtech.tis.plugin.datax.SelectedTab; @@ -36,6 +37,7 @@ import com.qlangtech.tis.plugin.ds.BasicDataSourceFactory; import com.qlangtech.tis.plugin.ds.DataSourceMeta; import com.qlangtech.tis.plugin.ds.ISelectedTab; +import com.qlangtech.tis.plugin.ds.JDBCConnection; import com.qlangtech.tis.plugin.ds.doris.DorisSourceFactory; import com.qlangtech.tis.plugins.incr.flink.chunjun.script.ChunjunSqlType; import com.qlangtech.tis.plugins.incr.flink.connector.ChunjunSinkFactory; @@ -46,6 +48,7 @@ import com.qlangtech.tis.sql.parser.tuple.creator.IStreamIncrGenerateStrategy; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang3.tuple.Pair; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.http.HttpEntity; import org.junit.After; import org.junit.Assert; @@ -246,50 +249,49 @@ private AtomicInteger getHttpPutCount() { public void testSinkSyncWithSQL() throws Exception { AtomicInteger httpPutCount = getHttpPutCount(); - super.testSinkSync((dataxProcessor, sinkFactory, env, selectedTab) -> { - /** - * ================================================== - */ - TestTableRegisterFlinkSourceHandle tableRegisterHandle = new TotalpayRegisterFlinkSourceHandle(selectedTab); - tableRegisterHandle.setSinkFuncFactory(sinkFactory); - -// (tab) -> { -// return () -> { -// return selectedTab.getCols().stream() -// .map((c) -> new HdfsColMeta( -// c.getName(), c.isNullable(), c.isPk(), c.getType())).collect(Collectors.toList()); -// }; -// } - tableRegisterHandle.setSourceStreamTableMeta(new IStreamTableMeataCreator.ISourceStreamMetaCreator() { - @Override - public ISelectedTab getSelectedTab(String tableName) { - throw new UnsupportedOperationException(tableName); - } + IStreamScriptRun streamScriptRun = new IStreamScriptRun() { + @Override + protected void runStream(DataxProcessor dataxProcessor + , ChunjunSinkFactory sinkFactory, StreamExecutionEnvironment env, SelectedTab selectedTab) throws Exception { + /** + * ================================================== + */ + TestTableRegisterFlinkSourceHandle tableRegisterHandle = new TotalpayRegisterFlinkSourceHandle(selectedTab); + tableRegisterHandle.setSinkFuncFactory(sinkFactory); + + tableRegisterHandle.setSourceStreamTableMeta(new IStreamTableMeataCreator.ISourceStreamMetaCreator() { + @Override + public ISelectedTab getSelectedTab(String tableName) { + throw new UnsupportedOperationException(tableName); + } - @Override - public IStreamTableMeta getStreamTableMeta(String tableName) { - return () -> { - return selectedTab.getCols().stream() - .map((c) -> new HdfsColMeta( - c.getName(), c.isNullable(), c.isPk(), c.getType())).collect(Collectors.toList()); - }; - } - }); - - List sourceFuncts = Lists.newArrayList(); - dataxProcessor.getTabAlias().forEach((key, val) -> { - Pair> sourceStream = createReaderSource(env, val); - sourceFuncts.add(sourceStream.getRight()); - }); - - SourceChannel sourceChannel = new SourceChannel(sourceFuncts); - sourceChannel.setFocusTabs(Collections.singletonList(selectedTab), dataxProcessor.getTabAlias(), DTOStream::createDispatched); - tableRegisterHandle.consume(new TargetResName(dataXName), sourceChannel, dataxProcessor); - /** - * =========================================== - */ - }); + @Override + public IStreamTableMeta getStreamTableMeta(String tableName) { + return () -> { + return selectedTab.getCols().stream() + .map((c) -> new HdfsColMeta( + c.getName(), c.isNullable(), c.isPk(), c.getType())).collect(Collectors.toList()); + }; + } + }); + + List sourceFuncts = Lists.newArrayList(); + dataxProcessor.getTabAlias().forEach((key, val) -> { + Pair> sourceStream = createReaderSource(this, env, val); + sourceFuncts.add(sourceStream.getRight()); + }); + + SourceChannel sourceChannel = new SourceChannel(sourceFuncts); + sourceChannel.setFocusTabs(Collections.singletonList(selectedTab), dataxProcessor.getTabAlias(), DTOStream::createDispatched); + tableRegisterHandle.consume(new TargetResName(dataXName), sourceChannel, dataxProcessor); + /** + * =========================================== + */ + } + }; + + super.testSinkSync(streamScriptRun); Assert.assertEquals("httpPutCount must be 1", 1, httpPutCount.get()); } diff --git a/tis-incr/tis-flink-chunjun-oracle-plugin/src/test/java/com/qlangtech/plugins/incr/flink/chunjun/oracle/sink/TestChunjunOracleSinkFactory.java b/tis-incr/tis-flink-chunjun-oracle-plugin/src/test/java/com/qlangtech/plugins/incr/flink/chunjun/oracle/sink/TestChunjunOracleSinkFactory.java index bba687b17..6f0483cfe 100644 --- a/tis-incr/tis-flink-chunjun-oracle-plugin/src/test/java/com/qlangtech/plugins/incr/flink/chunjun/oracle/sink/TestChunjunOracleSinkFactory.java +++ b/tis-incr/tis-flink-chunjun-oracle-plugin/src/test/java/com/qlangtech/plugins/incr/flink/chunjun/oracle/sink/TestChunjunOracleSinkFactory.java @@ -73,10 +73,10 @@ protected UpdateMode createIncrMode() { return insertType; } - @Override - protected ArrayList getUniqueKey() { - return Lists.newArrayList(colId, updateTime); - } +// @Override +// protected ArrayList getUniqueKey() { +// return Lists.newArrayList(colId, updateTime); +// } @Override protected BasicDataSourceFactory getDsFactory() { diff --git a/tis-incr/tis-flink-chunjun-postgresql-plugin/src/test/java/com/qlangtech/plugins/incr/flink/chunjun/postgresql/sink/TestChunjunPostgreSQLSinkFactory.java b/tis-incr/tis-flink-chunjun-postgresql-plugin/src/test/java/com/qlangtech/plugins/incr/flink/chunjun/postgresql/sink/TestChunjunPostgreSQLSinkFactory.java index f38ff1989..cfb487b86 100644 --- a/tis-incr/tis-flink-chunjun-postgresql-plugin/src/test/java/com/qlangtech/plugins/incr/flink/chunjun/postgresql/sink/TestChunjunPostgreSQLSinkFactory.java +++ b/tis-incr/tis-flink-chunjun-postgresql-plugin/src/test/java/com/qlangtech/plugins/incr/flink/chunjun/postgresql/sink/TestChunjunPostgreSQLSinkFactory.java @@ -34,6 +34,7 @@ import org.junit.Test; import java.util.ArrayList; +import java.util.List; /** * @author: 百岁(baisui@qlangtech.com) @@ -71,7 +72,7 @@ protected UpdateMode createIncrMode() { } @Override - protected ArrayList getUniqueKey() { + protected List getUniqueKey(List metaCols) { return Lists.newArrayList(colId); } diff --git a/tis-incr/tis-flink-chunjun-postgresql-plugin/src/test/java/com/qlangtech/plugins/incr/flink/chunjun/postgresql/sink/TestChunjunPostgreSQLSinkFactoryByFullTypes.java b/tis-incr/tis-flink-chunjun-postgresql-plugin/src/test/java/com/qlangtech/plugins/incr/flink/chunjun/postgresql/sink/TestChunjunPostgreSQLSinkFactoryByFullTypes.java index 8f38653d2..991f83614 100644 --- a/tis-incr/tis-flink-chunjun-postgresql-plugin/src/test/java/com/qlangtech/plugins/incr/flink/chunjun/postgresql/sink/TestChunjunPostgreSQLSinkFactoryByFullTypes.java +++ b/tis-incr/tis-flink-chunjun-postgresql-plugin/src/test/java/com/qlangtech/plugins/incr/flink/chunjun/postgresql/sink/TestChunjunPostgreSQLSinkFactoryByFullTypes.java @@ -29,13 +29,15 @@ import com.qlangtech.tis.plugins.incr.flink.connector.ChunjunSinkFactory; import com.qlangtech.tis.plugins.incr.flink.connector.UpdateMode; import com.qlangtech.tis.plugins.incr.flink.connector.impl.UpsertType; -import org.apache.flink.cdc.connectors.postgres.PostgresTestBase; + +import com.ververica.cdc.connectors.postgres.PostgresTestBase; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import java.sql.*; import java.util.ArrayList; +import java.util.List; /** * @author: 百岁(baisui@qlangtech.com) @@ -70,7 +72,7 @@ protected UpdateMode createIncrMode() { } @Override - protected ArrayList getUniqueKey() { + protected List getUniqueKey(List metaCols) { return Lists.newArrayList(colId); } diff --git a/tis-incr/tis-flink-chunjun-starrocks-plugin/src/test/java/com/qlangtech/tis/plugins/incr/flink/chunjun/starrocks/TestChunjunStarRocksSinkFactory.java b/tis-incr/tis-flink-chunjun-starrocks-plugin/src/test/java/com/qlangtech/tis/plugins/incr/flink/chunjun/starrocks/TestChunjunStarRocksSinkFactory.java index 8fd4d3d74..19c209557 100644 --- a/tis-incr/tis-flink-chunjun-starrocks-plugin/src/test/java/com/qlangtech/tis/plugins/incr/flink/chunjun/starrocks/TestChunjunStarRocksSinkFactory.java +++ b/tis-incr/tis-flink-chunjun-starrocks-plugin/src/test/java/com/qlangtech/tis/plugins/incr/flink/chunjun/starrocks/TestChunjunStarRocksSinkFactory.java @@ -27,6 +27,7 @@ import com.qlangtech.tis.coredefine.module.action.TargetResName; import com.qlangtech.tis.datax.IStreamTableMeataCreator; import com.qlangtech.tis.datax.IStreamTableMeta; +import com.qlangtech.tis.datax.impl.DataxProcessor; import com.qlangtech.tis.plugin.IEndTypeGetter; import com.qlangtech.tis.plugin.datax.SelectedTab; import com.qlangtech.tis.plugin.datax.common.BasicDataXRdbmsWriter; @@ -46,6 +47,7 @@ import com.qlangtech.tis.realtime.transfer.DTO; import com.qlangtech.tis.sql.parser.tuple.creator.IStreamIncrGenerateStrategy; import org.apache.commons.lang3.tuple.Pair; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -91,13 +93,15 @@ public static void stop() { @Test public void testSinkSyncWithSQL() throws Exception { - // AtomicInteger httpPutCount = getHttpPutCount(); - super.testSinkSync((dataxProcessor, sinkFactory, env, selectedTab) -> { - /** - * ================================================== - */ - TestTableRegisterFlinkSourceHandle tableRegisterHandle = new TotalpayRegisterFlinkSourceHandle(selectedTab); - tableRegisterHandle.setSinkFuncFactory(sinkFactory); + IStreamScriptRun streamScriptRun = new IStreamScriptRun() { + @Override + protected void runStream(DataxProcessor dataxProcessor, ChunjunSinkFactory sinkFactory, StreamExecutionEnvironment env, + SelectedTab selectedTab) throws Exception { + /** + * ================================================== + */ + TestTableRegisterFlinkSourceHandle tableRegisterHandle = new TotalpayRegisterFlinkSourceHandle(selectedTab); + tableRegisterHandle.setSinkFuncFactory(sinkFactory); // (tab) -> { // return () -> { @@ -107,34 +111,37 @@ public void testSinkSyncWithSQL() throws Exception { // }; // } - tableRegisterHandle.setSourceStreamTableMeta(new IStreamTableMeataCreator.ISourceStreamMetaCreator() { - @Override - public ISelectedTab getSelectedTab(String tableName) { - return new DefaultTab(tableName); - } - - @Override - public IStreamTableMeta getStreamTableMeta(String tableName) { - return () -> selectedTab.getCols().stream() - .map((c) -> new HdfsColMeta( - c.getName(), c.isNullable(), c.isPk(), c.getType())).collect(Collectors.toList()); - } - }); - - List sourceFuncts = Lists.newArrayList(); - dataxProcessor.getTabAlias().forEach((key, val) -> { - Pair> sourceStream = createReaderSource(env, val, false); - sourceFuncts.add(sourceStream.getRight()); - }); - - SourceChannel sourceChannel = new SourceChannel(sourceFuncts); - sourceChannel.setFocusTabs(Collections.singletonList(selectedTab), dataxProcessor.getTabAlias(), DTOStream::createDispatched); - tableRegisterHandle.consume(new TargetResName(dataXName), sourceChannel, dataxProcessor); - /** - * =========================================== - */ - }); - // Assert.assertEquals("httpPutCount must be 1", 1, httpPutCount.get()); + tableRegisterHandle.setSourceStreamTableMeta(new IStreamTableMeataCreator.ISourceStreamMetaCreator() { + @Override + public ISelectedTab getSelectedTab(String tableName) { + return new DefaultTab(tableName); + } + + @Override + public IStreamTableMeta getStreamTableMeta(String tableName) { + return () -> selectedTab.getCols().stream() + .map((c) -> new HdfsColMeta( + c.getName(), c.isNullable(), c.isPk(), c.getType())).collect(Collectors.toList()); + } + }); + + List sourceFuncts = Lists.newArrayList(); + dataxProcessor.getTabAlias().forEach((key, val) -> { + Pair> sourceStream = createReaderSource(env, val, this); + sourceFuncts.add(sourceStream.getRight()); + }); + + SourceChannel sourceChannel = new SourceChannel(sourceFuncts); + sourceChannel.setFocusTabs(Collections.singletonList(selectedTab), dataxProcessor.getTabAlias(), DTOStream::createDispatched); + tableRegisterHandle.consume(new TargetResName(dataXName), sourceChannel, dataxProcessor); + /** + * =========================================== + */ + } + }; + + + super.testSinkSync(streamScriptRun); } // assertResultSetFromStore(resultSet); diff --git a/tis-incr/tis-flink-extends/src/main/java/com/qlangtech/tis/realtime/SelectedTableTransformerRules.java b/tis-incr/tis-flink-extends/src/main/java/com/qlangtech/tis/realtime/SelectedTableTransformerRules.java index 7446b2d4b..94ebcb829 100644 --- a/tis-incr/tis-flink-extends/src/main/java/com/qlangtech/tis/realtime/SelectedTableTransformerRules.java +++ b/tis-incr/tis-flink-extends/src/main/java/com/qlangtech/tis/realtime/SelectedTableTransformerRules.java @@ -21,6 +21,7 @@ import com.alibaba.datax.core.job.ITransformerBuildInfo; import com.qlangtech.plugins.incr.flink.cdc.FlinkCol; import com.qlangtech.tis.async.message.client.consumer.IFlinkColCreator; +import com.qlangtech.tis.plugin.datax.transformer.OutputParameter; import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules; import com.qlangtech.tis.plugin.ds.IColMetaGetter; import com.qlangtech.tis.plugin.ds.ISelectedTab; @@ -49,9 +50,9 @@ public SelectedTableTransformerRules(RecordTransformerRules transformerRules, IS this.rules = transformerRules.createTransformerBuildInfo(dataXContext); } - List cols; + List cols; - public List overwriteColsWithContextParams() { + public List overwriteColsWithContextParams() { if (cols == null) { cols = rules.overwriteColsWithContextParams(this.tab.getCols()); } diff --git a/tis-incr/tis-incr-test/src/main/java/com/qlangtech/plugins/incr/flink/chunjun/doris/sink/TestFlinkSinkExecutor.java b/tis-incr/tis-incr-test/src/main/java/com/qlangtech/plugins/incr/flink/chunjun/doris/sink/TestFlinkSinkExecutor.java index 74fe8a53f..f3d91388a 100644 --- a/tis-incr/tis-incr-test/src/main/java/com/qlangtech/plugins/incr/flink/chunjun/doris/sink/TestFlinkSinkExecutor.java +++ b/tis-incr/tis-incr-test/src/main/java/com/qlangtech/plugins/incr/flink/chunjun/doris/sink/TestFlinkSinkExecutor.java @@ -39,6 +39,7 @@ import com.qlangtech.tis.manage.common.Config; import com.qlangtech.tis.manage.common.TisUTF8; import com.qlangtech.tis.plugin.KeyedPluginStore; +import com.qlangtech.tis.plugin.datax.AbstractCreateTableSqlBuilder.CreateDDL; import com.qlangtech.tis.plugin.datax.CreateTableSqlBuilder; import com.qlangtech.tis.plugin.datax.SelectedTab; import com.qlangtech.tis.plugin.datax.common.BasicDataXRdbmsWriter; @@ -59,6 +60,7 @@ import com.qlangtech.tis.realtime.transfer.DTO; import com.qlangtech.tis.test.TISEasyMock; import com.qlangtech.tis.util.HeteroEnum; +import org.apache.commons.collections.MapUtils; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -82,13 +84,14 @@ import java.sql.SQLException; import java.sql.Statement; import java.sql.Types; -import java.util.ArrayList; +import java.text.SimpleDateFormat; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.function.Consumer; +import java.util.stream.Collectors; /** * @author: 百岁(baisui@qlangtech.com) @@ -103,7 +106,7 @@ public abstract class TestFlinkSinkExecutor extends AbstractTestBase implements protected static final String dbName = "tis"; String colEntityId = "entity_id"; - protected String colNum = "num"; + protected final String colNum = "num"; static protected String colId = "id"; String colCreateTime = "create_time"; protected String updateTime = "update_time"; @@ -112,6 +115,21 @@ public abstract class TestFlinkSinkExecutor extends AbstractTestBase implements static String price = "price"; String pk = "88888888887"; + public static final String newAddedRecord = "88888888889"; + public static final String deleteFinallyId = "88888888890"; + + + final String colEntityIdVal = "334556"; + final int colNumVal = 5; + final int colNumValUpdated = 999; + final long colCreateTimeVal = 20211113115959l; + final BigDecimal priceVal = new BigDecimal("1314.99"); + + final String updateDateVal = "2021-12-09"; + final String starTimeVal = "2021-12-18 09:21:20"; + final String updateTimeVal = "2021-12-17 09:21:20"; + final String updateTimeValUpdated = "2021-12-17 09:21:22"; + @ClassRule(order = 100) public static TestRule name = new TISApplySkipFlinkClassloaderFactoryCreation(); @@ -135,7 +153,7 @@ public void test() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DTO d = createDTO(DTO.EventType.ADD); DTO update = createDTO(DTO.EventType.UPDATE_AFTER, (after) -> { - after.put(colNum, 999); + after.put(colNum, colNumValUpdated); }); // env.fromElements(new DTO[]{d, update}).addSink(new PrintSinkFunction<>()); @@ -156,23 +174,35 @@ public void test() throws Exception { env.execute("testJob"); - Thread.sleep(5000); + Thread.sleep(3000); } - protected int updateNumVal = 999; + // protected int updateNumVal = 999; + +// protected DTO[] createTestDTO() { +// return createTestDTO(true); +// } - protected DTO[] createTestDTO() { - return createTestDTO(true); - } - protected DTO[] createTestDTO(boolean needDelete) { + public interface FlinkTestCase { + public List createTestData(); + public void verifyRelevantRow(CreateDDL ddl, Statement statement) throws SQLException; + } + + /** + * 创建一条针对同一 + * + * @param needDelete + * @return + */ + private List createDTOsForOneSingleRecord(boolean needDelete) { DTO add = createDTO(DTO.EventType.ADD); final DTO updateBefore = createDTO(DTO.EventType.UPDATE_BEFORE, (after) -> { - after.put(colNum, updateNumVal); - after.put(updateTime, "2021-12-17 09:21:22"); + after.put(colNum, colNumValUpdated); + after.put(updateTime, updateTimeValUpdated); }); final DTO updateAfter = updateBefore.colone(); updateAfter.setEventType(DTO.EventType.UPDATE_AFTER); @@ -184,7 +214,7 @@ protected DTO[] createTestDTO(boolean needDelete) { delete.setEventType(DTO.EventType.DELETE); dtos.add(delete); } - return dtos.toArray(new DTO[dtos.size()]); //new DTO[]{add, updateAfter}; + return dtos; } /** @@ -193,38 +223,80 @@ protected DTO[] createTestDTO(boolean needDelete) { * @throws Exception */ protected void testSinkSync() throws Exception { - testSinkSync((dataxProcessor, sinkFactory, env, selectedTab) -> { - IFlinkColCreator flinkColCreator = null; - Map> sinkFunction = sinkFactory.createSinkFunction(dataxProcessor, flinkColCreator); - Assert.assertEquals(1, sinkFunction.size()); - this.startTestSinkSync(sinkFunction); + final IStreamScriptRun streamScriptRun = new IStreamScriptRun() { + @Override + public void runStream(DataxProcessor dataxProcessor, ChunjunSinkFactory sinkFactory, StreamExecutionEnvironment env, SelectedTab selectedTab) throws Exception { + IFlinkColCreator flinkColCreator = null; + Map> sinkFunction = sinkFactory.createSinkFunction(dataxProcessor, flinkColCreator); + Assert.assertEquals(1, sinkFunction.size()); + TestFlinkSinkExecutor.this.startTestSinkSync(sinkFunction); + + for (Map.Entry> entry : sinkFunction.entrySet()) { - for (Map.Entry> entry : sinkFunction.entrySet()) { + Pair> sourceStream = createReaderSource(env, entry.getKey(), this); - Pair> sourceStream = createReaderSource(env, entry.getKey()); + entry.getValue().add2Sink(sourceStream.getKey()); - entry.getValue().add2Sink(sourceStream.getKey()); + // sourceStream.getStream().addSink(); - // sourceStream.getStream().addSink(); + // entry.getValue().add2Sink(sourceStream.addStream(env.fromElements(new DTO[]{d, update}))); + // env.fromElements(new DTO[]{d}).addSink(entry.getValue()); + break; + } - // entry.getValue().add2Sink(sourceStream.addStream(env.fromElements(new DTO[]{d, update}))); - // env.fromElements(new DTO[]{d}).addSink(entry.getValue()); - break; + env.execute("testJob"); } - env.execute("testJob"); - }); + }; + + + this.testSinkSync(streamScriptRun); } +// protected DTO[] createTestDTO() { +// +// // 创建一条更新记录 +// List dtos = createDTOsForOneSingleRecord(false); +// +// // 创建一条添加记录 +// DTO newAdd = createDTO(DTO.EventType.ADD, (after) -> { +// after.put(colId, newAddedRecord); +// }); +// +// dtos.add(newAdd); +// +// +// // 创建三条event,最终该记录会被删除 +// List deleteFinally = this.createDTOsForOneSingleRecord(true); +// Map after; +// Map before; +// for (DTO d : deleteFinally) { +// after = d.getAfter(); +// before = d.getBefore(); +// if (MapUtils.isNotEmpty(after)) { +// after.put(colId, deleteFinallyId); +// } +// if (MapUtils.isNotEmpty(before)) { +// before.put(colId, deleteFinallyId); +// } +// // 三条: 1.添加 2.更新 3.删除 +// dtos.add(d); +// } +// +// Assert.assertEquals(7, dtos.size()); +// return dtos.toArray(new DTO[dtos.size()]); //new DTO[]{add, updateAfter}; +// } + protected void startTestSinkSync(Map> sinkFunction) { } // @Test protected void testSinkSync(IStreamScriptRun streamScriptRun) throws Exception { + List flinkTestCases = streamScriptRun.createFlinkTestCases(); // System.out.println("logger.getClass():" + logger.getClass()); @@ -249,8 +321,7 @@ DISTRIBUTED BY HASH(`id`) BUCKETS 10 try { - // String[] colNames = new String[]{colEntityId, colNum, colId, colCreateTime, updateTime, updateDate, - // starTime}; + // 定义MetaData SelectedTab totalpayInfo = createSelectedTab(); // tableName = totalpayInfo.getName(); DataxProcessor dataxProcessor = mock("dataxProcessor", DataxProcessor.class); @@ -347,7 +418,7 @@ public IMQListener create() { streamScriptRun.runStream(dataxProcessor, sinkFactory, env, totalpayInfo); - Thread.sleep(9000); + Thread.sleep(3000); // DBConfig dbConfig = this.getDsFactory().getDbConfig(); @@ -360,52 +431,176 @@ public IMQListener create() { final CreateTableSqlBuilder.CreateDDL ddl = createDDL; this.getDsFactory().visitFirstConnection((c) -> { Connection conn = c.getConnection(); - try (Statement statement = conn.createStatement()) { - // + " where id='" + pk + "'" - try (ResultSet resultSet = statement.executeQuery(ddl.getSelectAllScript())) { - if (resultSet.next()) { - IResultRows.printRow(resultSet); - assertResultSetFromStore(resultSet); - } else { - Assert.fail("have not find row with id=" + pk); - } - } - } + bizVerify(conn, flinkTestCases, ddl); }); } this.verifyAll(); } catch (Throwable e) { - Thread.sleep(14000); + // Thread.sleep(4000); throw new RuntimeException(e); } } - public interface IStreamScriptRun { + /** + * 到目标端数据库中验证最终数据是否正确 + * + * @param conn + * @param ddl + * @throws SQLException + */ + protected void bizVerify(Connection conn, List flinkTestCases, CreateDDL ddl) throws SQLException { + + try (Statement statement = conn.createStatement()) { + for (FlinkTestCase testCase : flinkTestCases) { + testCase.verifyRelevantRow(ddl, statement); + } + } + } + + + public abstract class IStreamScriptRun { + + protected abstract void runStream(DataxProcessor dataxProcessor, ChunjunSinkFactory sinkFactory, StreamExecutionEnvironment env, + SelectedTab selectedTab) throws Exception; + + private void verifyReocrdVals(CreateDDL ddl, Statement statement, String pk, int colNumVal, String updateTimeVal) throws SQLException { + final String selectSQL = ddl.getSelectAllScript(); + try (ResultSet resultSet = statement.executeQuery(selectSQL + " where " + colId + "='" + pk + "'")) { + if (resultSet.next()) { + // 添加且被更新了 + IResultRows.printRow(resultSet); + assertResultSetFromStore(resultSet); + + Assert.assertEquals("val of " + colEntityId, colEntityIdVal, resultSet.getString(colEntityId)); + Assert.assertEquals("val of " + colNum, colNumVal, resultSet.getInt(colNum)); + Assert.assertEquals("val of " + colId, pk, resultSet.getString(colId)); + Assert.assertEquals("val of " + colCreateTime, colCreateTimeVal, resultSet.getLong(colCreateTime)); + SimpleDateFormat datetimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + Assert.assertEquals("val of " + updateTime, updateTimeVal, datetimeFormat.format(resultSet.getTimestamp(updateTime))); + Assert.assertEquals("val of " + starTime, starTimeVal, datetimeFormat.format(resultSet.getTimestamp(starTime))); + + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd"); + Assert.assertEquals("val of " + updateDate, updateDateVal, dateFormat.format(resultSet.getDate(updateDate))); + Assert.assertEquals("val of " + price, priceVal, resultSet.getBigDecimal(price)); + + } else { + Assert.fail("have not find row with id=" + pk); + } + } + } + + + public List createFlinkTestCases() { + List testCases = Lists.newArrayList(); + /** + * 创建一条更新记录 + */ + testCases.add(new FlinkTestCase() { + @Override + public List createTestData() { + return createDTOsForOneSingleRecord(false); + } + + @Override + public void verifyRelevantRow(CreateDDL ddl, Statement statement) throws SQLException { + verifyReocrdVals(ddl, statement, pk, colNumValUpdated, updateTimeValUpdated); + } + }); + + /** + * 创建一条添加记录 + */ + testCases.add(new FlinkTestCase() { + @Override + public List createTestData() { + DTO newAdd = createDTO(DTO.EventType.ADD, (after) -> { + after.put(colId, newAddedRecord); + }); + return Lists.newArrayList(newAdd); + } + + @Override + public void verifyRelevantRow(CreateDDL ddl, Statement statement) throws SQLException { + String selectSQL = ddl.getSelectAllScript(); + try (ResultSet resultSet = statement.executeQuery(selectSQL)) { + while (resultSet.next()) { + System.out.println("----->" + resultSet.getString(colId)); + } + } - void runStream(DataxProcessor dataxProcessor, ChunjunSinkFactory sinkFactory, StreamExecutionEnvironment env, - SelectedTab selectedTab) throws Exception; + + verifyReocrdVals(ddl, statement, newAddedRecord, colNumVal, updateTimeVal); + } + }); + + /** + * 创建三条event,最终该记录会被删除 + */ + testCases.add(new FlinkTestCase() { + @Override + public List createTestData() { + List deleteFinally = createDTOsForOneSingleRecord(true); + Map after; + Map before; + for (DTO d : deleteFinally) { + after = d.getAfter(); + before = d.getBefore(); + if (MapUtils.isNotEmpty(after)) { + after.put(colId, deleteFinallyId); + } + if (MapUtils.isNotEmpty(before)) { + before.put(colId, deleteFinallyId); + } + // 三条: 1.添加 2.更新 3.删除 + } + return deleteFinally; + } + + @Override + public void verifyRelevantRow(CreateDDL ddl, Statement statement) throws SQLException { + + final String countSQL = ddl.getCountSelectScript(Optional.of(colId + "='" + deleteFinallyId + "'")); + try (ResultSet resultSet = statement.executeQuery(countSQL)) { + if (resultSet.next()) { + Assert.assertEquals("result count must be 0", 0, resultSet.getInt(1)); + } else { + Assert.fail("must contain value for SQL:" + countSQL); + } + } + } + }); + + return testCases; + } } - protected Pair> createReaderSource(StreamExecutionEnvironment env, + protected Pair> createReaderSource(IStreamScriptRun streamScriptRun, StreamExecutionEnvironment env, TableAlias tableAlia) { - return createReaderSource(env, tableAlia, true); + return createReaderSource(env, tableAlia, streamScriptRun); } - protected Pair> createReaderSource(StreamExecutionEnvironment env, - TableAlias tableAlia, boolean needDelete) { + protected Pair> createReaderSource( + StreamExecutionEnvironment env, + TableAlias tableAlia, IStreamScriptRun streamScriptRun) { + List testRecords = Lists.newArrayList(); + for (FlinkTestCase testCase : streamScriptRun.createFlinkTestCases()) { + for (DTO dto : testCase.createTestData()) { + testRecords.add(dto); + } + } + DTOStream sourceStream = DTOStream.createDispatched(tableAlia.getFrom()); ReaderSource readerSource = ReaderSource.createDTOSource("testStreamSource", - env.fromElements(this.createTestDTO(needDelete)).setParallelism(1)); + env.fromElements(testRecords.toArray(new DTO[testRecords.size()])).setParallelism(1)); readerSource.getSourceStream(env, new Tab2OutputTag<>(Collections.singletonMap(tableAlia, sourceStream))); return Pair.of(sourceStream, readerSource); } protected void assertResultSetFromStore(ResultSet resultSet) throws SQLException { - Assert.assertEquals(updateNumVal, resultSet.getInt(colNum)); - + // Assert.assertEquals("val of " + colNum, colNumValUpdated, resultSet.getInt(colNum)); Assert.assertNotNull(resultSet.getTimestamp(updateTime)); Assert.assertNotNull(resultSet.getTimestamp(starTime)); } @@ -473,7 +668,7 @@ protected SelectedTab createSelectedTab() { SelectedTab totalpayInfo = createSelectedTab(metaCols); totalpayInfo.setIncrSinkProps(sinkExt); totalpayInfo.name = tableName; - totalpayInfo.primaryKeys = getUniqueKey(); + totalpayInfo.primaryKeys = getUniqueKey(metaCols); return totalpayInfo; } @@ -484,8 +679,9 @@ protected SelectedTab createSelectedTab(List metaCols) { return tab; } - protected ArrayList getUniqueKey() { - return Lists.newArrayList(colId, updateTime); + protected List getUniqueKey(List metaCols) { + return (metaCols.stream().filter((col) -> col.isPk()).map((col) -> col.getName()).collect(Collectors.toList()));//.collect(Collectors.toList()); + // return Lists.newArrayList(colId, updateTime); } protected CMeta createUpdateTime() { @@ -513,17 +709,17 @@ protected DTO createDTO(DTO.EventType eventType, Consumer>.. d.setEventType(eventType); d.setTableName(tableName); Map after = Maps.newHashMap(); - after.put(colEntityId, "334556"); - after.put(colNum, 5); + after.put(colEntityId, colEntityIdVal); + after.put(colNum, colNumVal); after.put(colId, pk); - after.put(colCreateTime, 20211113115959l); + after.put(colCreateTime, colCreateTimeVal); // after.put(updateTime, "2021-12-17T09:21:20Z"); - after.put(updateTime, "2021-12-17 09:21:20"); - after.put(starTime, "2021-12-18 09:21:20"); - after.put(updateDate, "2021-12-09"); - after.put(price, new BigDecimal("1314.99")); + after.put(updateTime, updateTimeVal); + after.put(starTime, starTimeVal); + after.put(updateDate, updateDateVal); + after.put(price, priceVal); d.setAfter(after); - if (eventType != DTO.EventType.ADD) { + if (eventType != DTO.EventType.ADD || consumer.length > 0) { d.setBefore(Maps.newHashMap(after)); for (Consumer> c : consumer) { c.accept(after); diff --git a/tis-incr/tis-incr-test/src/main/java/com/qlangtech/plugins/incr/flink/sink/TestFlinkSinkExecutorByMySQLFullTypes.java b/tis-incr/tis-incr-test/src/main/java/com/qlangtech/plugins/incr/flink/sink/TestFlinkSinkExecutorByMySQLFullTypes.java index e1a02a9ca..3c039670b 100644 --- a/tis-incr/tis-incr-test/src/main/java/com/qlangtech/plugins/incr/flink/sink/TestFlinkSinkExecutorByMySQLFullTypes.java +++ b/tis-incr/tis-incr-test/src/main/java/com/qlangtech/plugins/incr/flink/sink/TestFlinkSinkExecutorByMySQLFullTypes.java @@ -36,6 +36,7 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; /** @@ -68,7 +69,7 @@ protected SelectedTab createSelectedTab() { sinkExt.incrMode = upsert; // sinkExt.uniqueKey = this.tabFullType.setIncrSinkProps(sinkExt); - this.tabFullType.primaryKeys = getUniqueKey(); + this.tabFullType.primaryKeys = getUniqueKey(Collections.emptyList()); List cols = this.tabFullType.getCols(); Assert.assertTrue(CollectionUtils.isNotEmpty(cols)); @@ -76,7 +77,7 @@ protected SelectedTab createSelectedTab() { } @Override - protected ArrayList getUniqueKey() { + protected List getUniqueKey(List metaCols) { return Lists.newArrayList(colId); } @@ -114,7 +115,7 @@ private T load(String sf) { // super.assertResultSetFromStore(resultSet); // } - @Override + // @Override protected DTO[] createTestDTO() { return new DTO[]{load("full_types_dto.xml")}; } diff --git a/tis-incr/tis-realtime-flink/src/main/java/com/qlangtech/tis/realtime/TableRegisterFlinkSourceHandle.java b/tis-incr/tis-realtime-flink/src/main/java/com/qlangtech/tis/realtime/TableRegisterFlinkSourceHandle.java index 28763fb14..2d5073bef 100644 --- a/tis-incr/tis-realtime-flink/src/main/java/com/qlangtech/tis/realtime/TableRegisterFlinkSourceHandle.java +++ b/tis-incr/tis-realtime-flink/src/main/java/com/qlangtech/tis/realtime/TableRegisterFlinkSourceHandle.java @@ -30,6 +30,7 @@ import com.qlangtech.tis.datax.TableAlias; import com.qlangtech.tis.datax.impl.DataxWriter; import com.qlangtech.tis.offline.DataxUtils; +import com.qlangtech.tis.plugin.datax.transformer.OutputParameter; import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules; import com.qlangtech.tis.plugin.ds.DBConfig; import com.qlangtech.tis.plugin.ds.DataSourceFactory; @@ -221,7 +222,7 @@ protected void registerSourceTable(StreamTableEnvironment tabEnv tRules = transformers.get(); ITransformerBuildInfo transformerCfg = tRules.createTransformerBuildInfo(namedContext); - List colsWithContextParams + List colsWithContextParams = transformerCfg.overwriteColsWithContextParams(srcCols); transformerMapper = new RowTransformerMapper( FlinkCol.getAllTabColsMeta(colsWithContextParams diff --git a/tis-incr/tis-sink-elasticsearch7/dependency-reduced-pom.xml b/tis-incr/tis-sink-elasticsearch7/dependency-reduced-pom.xml index c00cf1b5a..fa9d83644 100644 --- a/tis-incr/tis-sink-elasticsearch7/dependency-reduced-pom.xml +++ b/tis-incr/tis-sink-elasticsearch7/dependency-reduced-pom.xml @@ -3,12 +3,12 @@ tis-incr com.qlangtech.tis.plugins - 4.1.0 + 4.1.0-SNAPSHOT 4.0.0 com.qlangtech.tis.plugins tis-sink-elasticsearch7 - 4.1.0 + 4.1.0-SNAPSHOT GNU Affero General Public License @@ -63,7 +63,7 @@ com.qlangtech.tis tis-plugin - 4.1.0 + 4.1.0-SNAPSHOT provided diff --git a/tis-transformer/src/main/java/com/qlangtech/tis/plugin/datax/transformer/impl/ConcatUDF.java b/tis-transformer/src/main/java/com/qlangtech/tis/plugin/datax/transformer/impl/ConcatUDF.java index b79a71218..d8e3f6f43 100644 --- a/tis-transformer/src/main/java/com/qlangtech/tis/plugin/datax/transformer/impl/ConcatUDF.java +++ b/tis-transformer/src/main/java/com/qlangtech/tis/plugin/datax/transformer/impl/ConcatUDF.java @@ -92,7 +92,7 @@ public static List getToCols() { @Override public List outParameters() { - return Collections.singletonList(OutputParameter.create(this.to)); + return Collections.singletonList(TargetColType.create(this.to)); } @Override diff --git a/tis-transformer/src/main/java/com/qlangtech/tis/plugin/datax/transformer/impl/CopyValUDF.java b/tis-transformer/src/main/java/com/qlangtech/tis/plugin/datax/transformer/impl/CopyValUDF.java index ae7ce403e..afa97abea 100644 --- a/tis-transformer/src/main/java/com/qlangtech/tis/plugin/datax/transformer/impl/CopyValUDF.java +++ b/tis-transformer/src/main/java/com/qlangtech/tis/plugin/datax/transformer/impl/CopyValUDF.java @@ -54,7 +54,7 @@ private TargetColType getTO() { @Override public List outParameters() { - return Collections.singletonList(OutputParameter.create(this.getTO())); + return Collections.singletonList(TargetColType.create(this.getTO())); } @Override diff --git a/tis-transformer/src/main/java/com/qlangtech/tis/plugin/datax/transformer/impl/JSONSplitterUDF.java b/tis-transformer/src/main/java/com/qlangtech/tis/plugin/datax/transformer/impl/JSONSplitterUDF.java index 546fdcc3c..74f51c7c1 100644 --- a/tis-transformer/src/main/java/com/qlangtech/tis/plugin/datax/transformer/impl/JSONSplitterUDF.java +++ b/tis-transformer/src/main/java/com/qlangtech/tis/plugin/datax/transformer/impl/JSONSplitterUDF.java @@ -66,7 +66,7 @@ public class JSONSplitterUDF extends AbstractFromColumnUDFDefinition { @Override public List outParameters() { return this.to.stream().map((col) -> { - return OutputParameter.create(getPrefixToFieldName(col), col); + return TargetColType.create(getPrefixToFieldName(col), col); }).collect(Collectors.toList());//.stream().map((c) -> c).collect(Collectors.toList()); }