From ac7637581045a4fcec0f462fa3e7520f3a2ade68 Mon Sep 17 00:00:00 2001 From: mozhenghua Date: Wed, 26 Jun 2024 19:19:57 +0800 Subject: [PATCH] add transformer for DataX config file writer cols append with transformer cols --- .../plugin/datax/CassandraWriterContext.java | 47 +++++++------ .../plugin/datax/DataXCassandraWriter.java | 5 +- .../plugin/datax/DataXClickhouseWriter.java | 4 +- .../datax/TestDataXClickhouseWriter.java | 1 + .../datax/common/BasicDataXRdbmsReader.java | 20 ++++-- .../datax/common/RdbmsWriterContext.java | 8 ++- .../dameng/writer/DataXDaMengWriter.java | 6 +- .../dameng/writer/TestDataXDaMengWriter.java | 2 +- .../tis/plugin/datax/DataXDFSWriter.java | 8 +-- .../plugin/datax/DataXDFSWriterContext.java | 56 ++++----------- .../plugin/datax/doris/DataXDorisWriter.java | 5 +- .../datax/doris/DorisWriterContext.java | 7 +- .../datax/DataXElasticsearchWriter.java | 3 +- .../tis/plugin/datax/BasicFSWriter.java | 3 +- .../datax/kafka/writer/DataXKafkaWriter.java | 3 +- .../qlangtech/tis/datax/DataxExecutor.java | 5 +- .../K8SDataXPowerJobOverwriteTemplate.java | 10 --- .../tis/plugin/datax/DataXMongodbWriter.java | 5 +- .../plugin/datax/MongoDBWriterContext.java | 29 +++++--- tis-datax/tis-datax-odps-plugin/pom.xml | 4 ++ .../tis/plugin/datax/DataXOdpsWriter.java | 20 +++--- .../tis/plugin/datax/TestDataXOdpsWriter.java | 7 +- .../tis/plugin/datax/DataXOracleWriter.java | 4 +- .../tis/plugin/datax/OracleWriterContext.java | 37 +++++----- .../plugin/datax/DataXPostgresqlWriter.java | 4 +- .../plugin/datax/PostgreWriterContext.java | 7 +- .../plugin/datax/DataXSqlserverWriter.java | 8 +-- .../plugin/datax/SqlServerWriterContext.java | 7 +- .../plugin/datax/BasicStarRocksWriter.java | 4 +- .../starrocks/StarRocksWriterContext.java | 8 ++- .../tis/plugin/common/BasicTemplate.java | 3 +- .../tis/plugin/common/ReaderTemplate.java | 5 +- .../tis/plugin/common/WriterTemplate.java | 3 +- .../tis/plugin/datax/DataxMySQLWriter.java | 9 +-- .../plugin/datax/TestDataxMySQLWriter.java | 2 +- .../reader/DataXHiveReader.selectedTabs.json | 2 +- .../tis/realtime/BasicTISSinkFactory.java | 3 +- .../TableRegisterFlinkSourceHandle.java | 3 +- .../powerjob/impl/serverport/NodePort.java | 2 +- .../qlangtech/tis/plugin/k8s/K8SUtils.java | 23 +++---- .../impl/serverport/TestNodePort.java | 69 +++++++++++++++++++ .../impl/AbstractFromColumnUDFDefinition.java | 9 +-- .../datax/transformer/impl/ConcatUDF.java | 4 ++ 43 files changed, 280 insertions(+), 194 deletions(-) create mode 100644 tis-k8s-plugin/src/test/java/com/qlangtech/tis/plugin/datax/powerjob/impl/serverport/TestNodePort.java diff --git a/tis-datax/tis-datax-cassandra-plugin/src/main/java/com/qlangtech/tis/plugin/datax/CassandraWriterContext.java b/tis-datax/tis-datax-cassandra-plugin/src/main/java/com/qlangtech/tis/plugin/datax/CassandraWriterContext.java index f7d6f7551..c827488ff 100644 --- a/tis-datax/tis-datax-cassandra-plugin/src/main/java/com/qlangtech/tis/plugin/datax/CassandraWriterContext.java +++ b/tis-datax/tis-datax-cassandra-plugin/src/main/java/com/qlangtech/tis/plugin/datax/CassandraWriterContext.java @@ -1,19 +1,19 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package com.qlangtech.tis.plugin.datax; @@ -21,13 +21,13 @@ import com.google.common.collect.Lists; import com.qlangtech.tis.datax.IDataxContext; import com.qlangtech.tis.datax.IDataxProcessor; -import com.qlangtech.tis.plugin.ds.CMeta; -import com.qlangtech.tis.plugin.ds.ISelectedTab; -import com.qlangtech.tis.plugin.ds.ISelectedTab; +import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules; +import com.qlangtech.tis.plugin.ds.IColMetaGetter; import com.qlangtech.tis.plugin.ds.cassandra.CassandraDatasourceFactory; import org.apache.commons.lang.StringUtils; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; /** @@ -38,11 +38,15 @@ public class CassandraWriterContext implements IDataxContext { private final DataXCassandraWriter writer; private final IDataxProcessor.TableMap tabMapper; private final CassandraDatasourceFactory dsFactory; + private final List cols; - public CassandraWriterContext(DataXCassandraWriter writer, IDataxProcessor.TableMap tabMapper) { + public CassandraWriterContext(DataXCassandraWriter writer, IDataxProcessor.TableMap tabMapper, Optional transformerRules) { this.writer = writer; this.tabMapper = tabMapper; this.dsFactory = writer.getDataSourceFactory(); + + this.cols = transformerRules.map((rule) -> rule.overwriteCols(tabMapper.getSourceCols())) + .orElseGet(() -> tabMapper.getSourceCols().stream().collect(Collectors.toList())); } public String getKeyspace() { @@ -61,8 +65,9 @@ public String getTable() { return this.tabMapper.getTo(); } - public List getColumn() { - return this.tabMapper.getSourceCols(); + public List getColumn() { + return cols; + // return this.tabMapper.getSourceCols(); } public String getHost() { diff --git a/tis-datax/tis-datax-cassandra-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXCassandraWriter.java b/tis-datax/tis-datax-cassandra-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXCassandraWriter.java index 8f9b8246c..32f706457 100644 --- a/tis-datax/tis-datax-cassandra-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXCassandraWriter.java +++ b/tis-datax/tis-datax-cassandra-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXCassandraWriter.java @@ -28,6 +28,7 @@ import com.qlangtech.tis.plugin.annotation.FormField; import com.qlangtech.tis.plugin.annotation.FormFieldType; import com.qlangtech.tis.plugin.annotation.Validator; +import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules; import com.qlangtech.tis.plugin.ds.PostedDSProp; import com.qlangtech.tis.plugin.ds.cassandra.CassandraDatasourceFactory; @@ -84,11 +85,11 @@ public String getTemplate() { } @Override - public IDataxContext getSubTask(Optional tableMap) { + public IDataxContext getSubTask(Optional tableMap, Optional transformerRules) { if (!tableMap.isPresent()) { throw new IllegalArgumentException("param tableMap shall be present"); } - return new CassandraWriterContext(this, tableMap.get()); + return new CassandraWriterContext(this, tableMap.get(),transformerRules); } public CassandraDatasourceFactory getDataSourceFactory() { diff --git a/tis-datax/tis-datax-clickhouse-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXClickhouseWriter.java b/tis-datax/tis-datax-clickhouse-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXClickhouseWriter.java index c42737bfb..facb608ec 100644 --- a/tis-datax/tis-datax-clickhouse-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXClickhouseWriter.java +++ b/tis-datax/tis-datax-clickhouse-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXClickhouseWriter.java @@ -191,7 +191,7 @@ public ClickHouseDataSourceFactory getDataSourceFactory() { } @Override - public IDataxContext getSubTask(Optional tableMap) { + public IDataxContext getSubTask(Optional tableMap, Optional transformerRules) { if (!tableMap.isPresent()) { throw new IllegalArgumentException("tableMap shall be present"); } @@ -228,7 +228,7 @@ public IDataxContext getSubTask(Optional tableMap) { // context.setPostSql(helper.replacePlaceholders(this.postSql, resolver)); context.setPostSql(this.postSql); } - context.setCols(IDataxProcessor.TabCols.create(ds, tableMap.get())); + context.setCols(IDataxProcessor.TabCols.create(ds, tableMap.get(), transformerRules)); return context; } diff --git a/tis-datax/tis-datax-clickhouse-plugin/src/test/java/com/qlangtech/tis/plugin/datax/TestDataXClickhouseWriter.java b/tis-datax/tis-datax-clickhouse-plugin/src/test/java/com/qlangtech/tis/plugin/datax/TestDataXClickhouseWriter.java index bd334abc4..4091269c0 100644 --- a/tis-datax/tis-datax-clickhouse-plugin/src/test/java/com/qlangtech/tis/plugin/datax/TestDataXClickhouseWriter.java +++ b/tis-datax/tis-datax-clickhouse-plugin/src/test/java/com/qlangtech/tis/plugin/datax/TestDataXClickhouseWriter.java @@ -19,6 +19,7 @@ package com.qlangtech.tis.plugin.datax; import com.google.common.collect.Lists; +import com.qlangtech.tis.datax.DataXCfgFile; import com.qlangtech.tis.datax.IDataxProcessor; import com.qlangtech.tis.datax.impl.DataxProcessor; import com.qlangtech.tis.datax.impl.DataxWriter; diff --git a/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/datax/common/BasicDataXRdbmsReader.java b/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/datax/common/BasicDataXRdbmsReader.java index c231166d1..5c2b64dc5 100644 --- a/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/datax/common/BasicDataXRdbmsReader.java +++ b/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/datax/common/BasicDataXRdbmsReader.java @@ -43,6 +43,7 @@ import com.qlangtech.tis.runtime.module.misc.IControlMsgHandler; import com.qlangtech.tis.runtime.module.misc.IFieldErrorHandler; import com.qlangtech.tis.sql.parser.tuple.creator.EntityName; +import org.apache.commons.collections.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -92,12 +93,19 @@ public final List getSelectedTabs() { if (this.preSelectedTabsHash == selectedTabs.hashCode()) { return selectedTabs; } - boolean shallFillSelectedTabMeta = shallFillSelectedTabMeta(); + this.selectedTabs = fillSelectedTabMeta(this.selectedTabs); + this.preSelectedTabsHash = selectedTabs.hashCode(); + return this.selectedTabs; + } + + @Override + public List fillSelectedTabMeta(List tabs) { + boolean shallFillSelectedTabMeta = shallFillSelectedTabMeta(); if (shallFillSelectedTabMeta) { try (TableColsMeta tabsMeta = getTabsMeta()) { - this.selectedTabs = this.selectedTabs.stream().map((tab) -> { + return tabs.stream().map((tab) -> { ColumnMetaData.fillSelectedTabMeta(tab, (t) -> { return tabsMeta.get(t.getName()); }); @@ -107,12 +115,13 @@ public final List getSelectedTabs() { throw new RuntimeException(e); } } - this.preSelectedTabsHash = selectedTabs.hashCode(); - return this.selectedTabs; - + return tabs; } protected boolean shallFillSelectedTabMeta() { + if (CollectionUtils.isEmpty(this.selectedTabs)) { + return true; + } for (SelectedTab tab : this.selectedTabs) { for (CMeta c : tab.cols) { return (c.getType() == null); @@ -123,6 +132,7 @@ protected boolean shallFillSelectedTabMeta() { protected abstract RdbmsReaderContext createDataXReaderContext(String jobName, SelectedTab tab, IDataSourceDumper dumper); + @Override public void setKey(KeyedPluginStore.Key key) { this.dataXName = key.keyVal.getVal(); diff --git a/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/datax/common/RdbmsWriterContext.java b/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/datax/common/RdbmsWriterContext.java index 076e5c0e2..7c90b6e32 100644 --- a/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/datax/common/RdbmsWriterContext.java +++ b/tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/plugin/datax/common/RdbmsWriterContext.java @@ -20,6 +20,8 @@ import com.qlangtech.tis.datax.IDataxContext; import com.qlangtech.tis.datax.IDataxProcessor; +import com.qlangtech.tis.datax.IDataxProcessor.TabCols; +import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules; import com.qlangtech.tis.plugin.ds.BasicDataSourceFactory; import org.apache.commons.lang.StringUtils; @@ -35,10 +37,12 @@ public abstract class RdbmsWriterContext implements IDataxContext { private final String tableName; - public RdbmsWriterContext(WRITER writer, IDataxProcessor.TableMap tabMapper) { + public RdbmsWriterContext(WRITER writer, IDataxProcessor.TableMap tabMapper, Optional< RecordTransformerRules> transformerRules) { super(writer, (DS) writer.getDataSourceFactory()); this.tableName = tabMapper.getTo(); - this.setCols(tabMapper.getSourceCols().stream().map((c) -> c.getName()).collect(Collectors.toList())); + TabCols tabCols = TabCols.create(dsFactory, tabMapper, transformerRules); + // this.setCols(tabMapper.getSourceCols().stream().map((c) -> c.getName()).collect(Collectors.toList())); + this.setCols(tabCols.getRawCols()); } public String getTableName() { diff --git a/tis-datax/tis-datax-dameng-plugin/src/main/java/com/qlangtech/tis/plugin/datax/dameng/writer/DataXDaMengWriter.java b/tis-datax/tis-datax-dameng-plugin/src/main/java/com/qlangtech/tis/plugin/datax/dameng/writer/DataXDaMengWriter.java index 0e8c49cdf..15d4e2ce5 100644 --- a/tis-datax/tis-datax-dameng-plugin/src/main/java/com/qlangtech/tis/plugin/datax/dameng/writer/DataXDaMengWriter.java +++ b/tis-datax/tis-datax-dameng-plugin/src/main/java/com/qlangtech/tis/plugin/datax/dameng/writer/DataXDaMengWriter.java @@ -44,7 +44,7 @@ public static String getDftTemplate() { @Override - public IDataxContext getSubTask(Optional tableMap) { + public IDataxContext getSubTask(Optional tableMap, Optional transformerRules) { if (!tableMap.isPresent()) { throw new IllegalArgumentException("param tableMap shall be present"); } @@ -68,7 +68,7 @@ public IDataxContext getSubTask(Optional tableMap) { context.setUsername(dsFactory.userName); context.setTabName(table.getTableName()); context.cols = IDataxProcessor.TabCols.create(new IDBReservedKeys() { - }, tm); + }, tm, transformerRules); context.setDbName(this.dbName); // context.writeMode = this.writeMode; context.setPreSql(this.preSql); @@ -151,7 +151,7 @@ public CreateTableSqlBuilder.CreateDDL generateCreateDDL(IDataxProcessor.TableMa CreateTableSqlBuilder.CreateDDL createDDL = null; - final CreateTableSqlBuilder createTableSqlBuilder = new CreateTableSqlBuilder(tableMapper, this.getDataSourceFactory(),transformers) { + final CreateTableSqlBuilder createTableSqlBuilder = new CreateTableSqlBuilder(tableMapper, this.getDataSourceFactory(), transformers) { @Override protected void appendExtraColDef(List pks) { if (CollectionUtils.isEmpty(pks)) { diff --git a/tis-datax/tis-datax-dameng-plugin/src/test/java/com/qlangtech/tis/plugin/datax/dameng/writer/TestDataXDaMengWriter.java b/tis-datax/tis-datax-dameng-plugin/src/test/java/com/qlangtech/tis/plugin/datax/dameng/writer/TestDataXDaMengWriter.java index 344a3aa73..9576f0d0c 100644 --- a/tis-datax/tis-datax-dameng-plugin/src/test/java/com/qlangtech/tis/plugin/datax/dameng/writer/TestDataXDaMengWriter.java +++ b/tis-datax/tis-datax-dameng-plugin/src/test/java/com/qlangtech/tis/plugin/datax/dameng/writer/TestDataXDaMengWriter.java @@ -211,7 +211,7 @@ public void testTempateGenerate() throws Exception { private void validateConfigGenerate(String assertFileName, DataXDaMengWriter mySQLWriter) throws IOException { Optional tableMap = TestSelectedTabs.createTableMapper(); - IDataxContext subTaskCtx = mySQLWriter.getSubTask(tableMap); + IDataxContext subTaskCtx = mySQLWriter.getSubTask(tableMap,Optional.empty()); Assert.assertNotNull(subTaskCtx); RdbmsDataxContext mySQLDataxContext = (RdbmsDataxContext) subTaskCtx; diff --git a/tis-datax/tis-datax-dfs-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXDFSWriter.java b/tis-datax/tis-datax-dfs-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXDFSWriter.java index e0b20fb9f..11c11247f 100644 --- a/tis-datax/tis-datax-dfs-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXDFSWriter.java +++ b/tis-datax/tis-datax-dfs-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXDFSWriter.java @@ -26,7 +26,6 @@ import com.qlangtech.tis.datax.IDataXGenerateCfgs; import com.qlangtech.tis.datax.IDataxContext; import com.qlangtech.tis.datax.IDataxProcessor; -import com.qlangtech.tis.datax.impl.DataXCfgGenerator; import com.qlangtech.tis.datax.impl.DataxWriter; import com.qlangtech.tis.exec.ExecutePhaseRange; import com.qlangtech.tis.exec.IExecChainContext; @@ -40,6 +39,7 @@ import com.qlangtech.tis.plugin.annotation.Validator; import com.qlangtech.tis.plugin.datax.format.FileFormat; import com.qlangtech.tis.plugin.datax.meta.MetaDataWriter; +import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules; import com.qlangtech.tis.plugin.ds.ISelectedTab; import com.qlangtech.tis.plugin.tdfs.ITDFSSession; import com.qlangtech.tis.plugin.tdfs.TDFSLinker; @@ -118,14 +118,14 @@ public String getTemplate() { } @Override - public IDataxContext getSubTask(Optional tableMap) { - DataXDFSWriterContext writerContext = new DataXDFSWriterContext(this, tableMap.get()); + public IDataxContext getSubTask(Optional tableMap, Optional transformerRules) { + DataXDFSWriterContext writerContext = new DataXDFSWriterContext(this, tableMap.get(), transformerRules); return writerContext; } @TISExtension() - public static class DefaultDescriptor extends BaseDataxWriterDescriptor { + public static class DefaultDescriptor extends BaseDataxWriterDescriptor { public DefaultDescriptor() { super(); // registerSelectOptions(KEY_FTP_SERVER_LINK, () -> ParamsConfig.getItems(FTPServer.FTP_SERVER)); diff --git a/tis-datax/tis-datax-dfs-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXDFSWriterContext.java b/tis-datax/tis-datax-dfs-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXDFSWriterContext.java index 042a81fb4..dda31fe7d 100644 --- a/tis-datax/tis-datax-dfs-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXDFSWriterContext.java +++ b/tis-datax/tis-datax-dfs-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXDFSWriterContext.java @@ -20,11 +20,14 @@ import com.qlangtech.tis.datax.IDataxContext; import com.qlangtech.tis.datax.IDataxProcessor; +import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules; import com.qlangtech.tis.plugin.ds.CMeta; +import com.qlangtech.tis.plugin.ds.IColMetaGetter; import org.apache.commons.lang.StringUtils; import java.util.List; import java.util.Objects; +import java.util.Optional; import java.util.stream.Collectors; /** @@ -35,55 +38,20 @@ public class DataXDFSWriterContext implements IDataxContext { private final DataXDFSWriter writer; private final IDataxProcessor.TableMap tableMapper; // private final FTPServer ftpServer; + private final List cols; - public DataXDFSWriterContext(DataXDFSWriter writer, IDataxProcessor.TableMap tableMapper) { + public DataXDFSWriterContext(DataXDFSWriter writer, IDataxProcessor.TableMap tableMapper, Optional transformerRules) { this.writer = writer; Objects.requireNonNull(writer.fileFormat, "prop fileFormat can not be null"); Objects.requireNonNull(writer.dfsLinker, "prop linker can not be null"); this.tableMapper = tableMapper; + + this.cols = transformerRules.map((rule) -> rule.overwriteCols(tableMapper.getSourceCols())) + .orElseGet(() -> tableMapper.getSourceCols().stream().collect(Collectors.toList())); + // this.ftpServer = FTPServer.getServer(writer.linker); } - // public String getProtocol() { -// return this.ftpServer.protocol; -// } -// -// public String getHost() { -// return this.ftpServer.host; -// } -// -// public boolean isContainPort() { -// return this.ftpServer.port != null; -// } -// -// public Integer getPort() { -// return this.ftpServer.port; -// } -// -// public boolean isContainTimeout() { -// return this.ftpServer.timeout != null; -// } -// -// public Integer getTimeout() { -// return this.ftpServer.timeout; -// } -// -// public String getUsername() { -// return this.ftpServer.username; -// } -// -// public String getPassword() { -// return this.ftpServer.password; -// } -// -// public boolean isContainConnectPattern() { -// return StringUtils.isNotBlank(this.ftpServer.connectPattern); -// } -// -// public String getConnectPattern() { -// return this.ftpServer.connectPattern; -// } -// public String getPath() { String path = null; if (StringUtils.isEmpty(path = this.writer.dfsLinker.getRootPath())) { @@ -113,7 +81,7 @@ public boolean isContainEncoding() { } public String getEncoding() { - // return this.writer.encoding; + // return this.writer.encoding; throw new UnsupportedOperationException(); } @@ -162,12 +130,12 @@ public String getSuffix() { } public boolean isContainHeader() { - List cols = tableMapper.getSourceCols(); + // List cols = tableMapper.getSourceCols(); return (this.writer.fileFormat.containHeader() && cols.size() > 0); } public String getHeader() { - return tableMapper.getSourceCols().stream().map((c) -> "'" + c.getName() + "'").collect(Collectors.joining(",")); + return this.cols.stream().map((c) -> "'" + c.getName() + "'").collect(Collectors.joining(",")); } } diff --git a/tis-datax/tis-datax-doris-plugin/src/main/java/com/qlangtech/tis/plugin/datax/doris/DataXDorisWriter.java b/tis-datax/tis-datax-doris-plugin/src/main/java/com/qlangtech/tis/plugin/datax/doris/DataXDorisWriter.java index 5dc2c3520..7ccb3bc2f 100644 --- a/tis-datax/tis-datax-doris-plugin/src/main/java/com/qlangtech/tis/plugin/datax/doris/DataXDorisWriter.java +++ b/tis-datax/tis-datax-doris-plugin/src/main/java/com/qlangtech/tis/plugin/datax/doris/DataXDorisWriter.java @@ -73,11 +73,11 @@ public boolean isGenerateCreateDDLSwitchOff() { } @Override - public IDataxContext getSubTask(Optional tableMap) { + public IDataxContext getSubTask(Optional tableMap, Optional transformerRules) { if (!tableMap.isPresent()) { throw new IllegalStateException("tableMap must be present"); } - return new DorisWriterContext(this, tableMap.get()); + return new DorisWriterContext(this, tableMap.get(), transformerRules); } @Override @@ -106,7 +106,6 @@ protected String getDecimalToken() { }; - @Override protected BasicCreateTableSqlBuilder createSQLDDLBuilder(IDataxProcessor.TableMap tableMapper, Optional transformers) { return new BasicCreateTableSqlBuilder(tableMapper, this.getDataSourceFactory(), columnTokenRecognise, transformers) { diff --git a/tis-datax/tis-datax-doris-plugin/src/main/java/com/qlangtech/tis/plugin/datax/doris/DorisWriterContext.java b/tis-datax/tis-datax-doris-plugin/src/main/java/com/qlangtech/tis/plugin/datax/doris/DorisWriterContext.java index 4d681dc6c..edfb35d78 100644 --- a/tis-datax/tis-datax-doris-plugin/src/main/java/com/qlangtech/tis/plugin/datax/doris/DorisWriterContext.java +++ b/tis-datax/tis-datax-doris-plugin/src/main/java/com/qlangtech/tis/plugin/datax/doris/DorisWriterContext.java @@ -22,11 +22,14 @@ import com.alibaba.fastjson.JSONObject; import com.qlangtech.tis.datax.IDataxProcessor; import com.qlangtech.tis.plugin.datax.common.RdbmsWriterContext; +import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules; import com.qlangtech.tis.plugin.ds.ISelectedTab; import com.qlangtech.tis.plugin.ds.doris.DorisSourceFactory; import com.qlangtech.tis.trigger.util.JsonUtil; import org.apache.commons.lang.StringUtils; +import java.util.Optional; + /** * @author: 百岁(baisui@qlangtech.com) * @create: 2021-09-07 09:58 @@ -34,8 +37,8 @@ public class DorisWriterContext extends RdbmsWriterContext { private final ISelectedTab dorisTab; - public DorisWriterContext(DataXDorisWriter writer, IDataxProcessor.TableMap tabMapper) { - super(writer, tabMapper); + public DorisWriterContext(DataXDorisWriter writer, IDataxProcessor.TableMap tabMapper, Optional transformerRules) { + super(writer, tabMapper, transformerRules); this.dorisTab = tabMapper.getSourceTab(); } diff --git a/tis-datax/tis-datax-elasticsearch-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXElasticsearchWriter.java b/tis-datax/tis-datax-elasticsearch-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXElasticsearchWriter.java index 86c39a0d8..76f58ced2 100644 --- a/tis-datax/tis-datax-elasticsearch-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXElasticsearchWriter.java +++ b/tis-datax/tis-datax-elasticsearch-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXElasticsearchWriter.java @@ -42,6 +42,7 @@ import com.qlangtech.tis.plugin.annotation.FormFieldType; import com.qlangtech.tis.plugin.annotation.Validator; import com.qlangtech.tis.plugin.datax.elastic.ElasticEndpoint; +import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules; import com.qlangtech.tis.plugin.ds.CMeta; import com.qlangtech.tis.plugin.ds.DataType; import com.qlangtech.tis.plugin.ds.DataXReaderColType; @@ -492,7 +493,7 @@ public String getTemplate() { @Override - public IDataxContext getSubTask(Optional tableMap) { + public IDataxContext getSubTask(Optional tableMap, Optional transformerRules) { if (!tableMap.isPresent()) { throw new IllegalStateException("tableMap must be present"); diff --git a/tis-datax/tis-datax-hdfs-plugin/src/main/java/com/qlangtech/tis/plugin/datax/BasicFSWriter.java b/tis-datax/tis-datax-hdfs-plugin/src/main/java/com/qlangtech/tis/plugin/datax/BasicFSWriter.java index 313892747..6b764e9bc 100644 --- a/tis-datax/tis-datax-hdfs-plugin/src/main/java/com/qlangtech/tis/plugin/datax/BasicFSWriter.java +++ b/tis-datax/tis-datax-hdfs-plugin/src/main/java/com/qlangtech/tis/plugin/datax/BasicFSWriter.java @@ -28,6 +28,7 @@ import com.qlangtech.tis.plugin.annotation.FormField; import com.qlangtech.tis.plugin.annotation.FormFieldType; import com.qlangtech.tis.plugin.annotation.Validator; +import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules; import com.qlangtech.tis.plugin.ds.CMeta; import org.apache.commons.lang.StringUtils; @@ -97,7 +98,7 @@ public FileSystemFactory getFsFactory() { } @Override - public final IDataxContext getSubTask(Optional tableMap) { + public final IDataxContext getSubTask(Optional tableMap, Optional transformerRules) { if (!tableMap.isPresent()) { throw new IllegalArgumentException("param tableMap shall be present"); } diff --git a/tis-datax/tis-datax-kafka-plugin/src/main/java/com/qlangtech/tis/plugins/datax/kafka/writer/DataXKafkaWriter.java b/tis-datax/tis-datax-kafka-plugin/src/main/java/com/qlangtech/tis/plugins/datax/kafka/writer/DataXKafkaWriter.java index 050dac85f..2dc0bb010 100644 --- a/tis-datax/tis-datax-kafka-plugin/src/main/java/com/qlangtech/tis/plugins/datax/kafka/writer/DataXKafkaWriter.java +++ b/tis-datax/tis-datax-kafka-plugin/src/main/java/com/qlangtech/tis/plugins/datax/kafka/writer/DataXKafkaWriter.java @@ -36,6 +36,7 @@ import com.qlangtech.tis.plugin.annotation.FormFieldType; import com.qlangtech.tis.plugin.annotation.Validator; import com.qlangtech.tis.plugin.datax.SelectedTab; +import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules; import com.qlangtech.tis.plugins.datax.kafka.writer.protocol.KafkaProtocol; import com.qlangtech.tis.realtime.transfer.DTO; import com.qlangtech.tis.runtime.module.misc.IControlMsgHandler; @@ -147,7 +148,7 @@ public String getTemplate() { } @Override - public IDataxContext getSubTask(Optional tableMap) { + public IDataxContext getSubTask(Optional tableMap,Optional transformerRules) { return null; } diff --git a/tis-datax/tis-datax-local-executor-utils/src/main/java/com/qlangtech/tis/datax/DataxExecutor.java b/tis-datax/tis-datax-local-executor-utils/src/main/java/com/qlangtech/tis/datax/DataxExecutor.java index 3c804fe8f..85ea0e5a2 100644 --- a/tis-datax/tis-datax-local-executor-utils/src/main/java/com/qlangtech/tis/datax/DataxExecutor.java +++ b/tis-datax/tis-datax-local-executor-utils/src/main/java/com/qlangtech/tis/datax/DataxExecutor.java @@ -487,7 +487,10 @@ public int getTaskSerializeNum() { public String getTISDataXName() { return this.dataXName.getTISDataXName(); } - + @Override + public String getDataXName() { + return this.dataXName.getTISDataXName(); + } @Override protected StandAloneJobContainerCommunicator createContainerCommunicator(Configuration configuration) { return new StandAloneJobContainerCommunicator(configuration) { diff --git a/tis-datax/tis-datax-local-powerjob-executor/src/main/java/com/qlangtech/tis/plugin/datax/powerjob/K8SDataXPowerJobOverwriteTemplate.java b/tis-datax/tis-datax-local-powerjob-executor/src/main/java/com/qlangtech/tis/plugin/datax/powerjob/K8SDataXPowerJobOverwriteTemplate.java index a47e65b72..ea4a9084d 100644 --- a/tis-datax/tis-datax-local-powerjob-executor/src/main/java/com/qlangtech/tis/plugin/datax/powerjob/K8SDataXPowerJobOverwriteTemplate.java +++ b/tis-datax/tis-datax-local-powerjob-executor/src/main/java/com/qlangtech/tis/plugin/datax/powerjob/K8SDataXPowerJobOverwriteTemplate.java @@ -22,20 +22,10 @@ public class K8SDataXPowerJobOverwriteTemplate extends K8SDataXPowerJobJobTempla @Override public SaveWorkflowRequest createWorkflowRequest(IDataxProcessor dataxProcessor) { SaveWorkflowRequest req = super.createWorkflowRequest(dataxProcessor); -// req.setWfName(dataxProcessor.identityValue()); -// req.setWfDescription(dataxProcessor.identityValue()); -// req.setEnable(true); Objects.requireNonNull(this.triggerStrategy, "triggerStrategy can not be null").setTimeExpression(req); return req; } -// @Override -// public void afterSaved() { -// PowerJobClient powerJob = DistributedPowerJobDataXJobSubmit.getTISPowerJob(); -// -// DistributedPowerJobDataXJobSubmit.innerSaveJob( ); -// } - @TISExtension() public static class DescriptorImpl extends BasicDescriptor implements IEndTypeGetter { diff --git a/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXMongodbWriter.java b/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXMongodbWriter.java index af2899d35..7a0e966f2 100644 --- a/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXMongodbWriter.java +++ b/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXMongodbWriter.java @@ -37,6 +37,7 @@ import com.qlangtech.tis.plugin.annotation.FormFieldType; import com.qlangtech.tis.plugin.annotation.Validator; import com.qlangtech.tis.plugin.datax.mongo.MongoWriterSelectedTab; +import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules; import com.qlangtech.tis.plugin.ds.DataSourceFactory; import com.qlangtech.tis.plugin.ds.IDataSourceFactoryGetter; import com.qlangtech.tis.plugin.ds.PostedDSProp; @@ -136,12 +137,12 @@ public String getTemplate() { } @Override - public IDataxContext getSubTask(Optional tableMap) { + public IDataxContext getSubTask(Optional tableMap, Optional transformerRules) { // if (tableMap.isPresent()) { // throw new IllegalStateException("tableMap must not be present"); // } MongoDBWriterContext context = new MongoDBWriterContext(this, - tableMap.orElseThrow(() -> new IllegalStateException("tableMap can not be null"))); + tableMap.orElseThrow(() -> new IllegalStateException("tableMap can not be null")), transformerRules); return context; } diff --git a/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/MongoDBWriterContext.java b/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/MongoDBWriterContext.java index 6fc7c09f9..b6806b3b1 100644 --- a/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/MongoDBWriterContext.java +++ b/tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/datax/MongoDBWriterContext.java @@ -23,11 +23,17 @@ import com.qlangtech.tis.datax.IDataxContext; import com.qlangtech.tis.datax.IDataxProcessor; import com.qlangtech.tis.plugin.datax.mongo.MongoWriterSelectedTab; +import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules; +import com.qlangtech.tis.plugin.ds.IColMetaGetter; import com.qlangtech.tis.plugin.ds.ISelectedTab; import com.qlangtech.tis.trigger.util.JsonUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + /** * @author: 百岁(baisui@qlangtech.com) * @create: 2021-06-20 14:01 @@ -37,13 +43,14 @@ public class MongoDBWriterContext extends BasicMongoDBContext implements IDataxC private final DataXMongodbWriter writer; private final IDataxProcessor.TableMap tableMapMapper; private final MongoWriterSelectedTab selectedTab; + private final Optional transformerRules; - - public MongoDBWriterContext(DataXMongodbWriter writer, IDataxProcessor.TableMap tableMapMapper) { + public MongoDBWriterContext(DataXMongodbWriter writer, IDataxProcessor.TableMap tableMapMapper, Optional transformerRules) { super(writer.getDsFactory()); this.tableMapMapper = tableMapMapper; this.writer = writer; this.selectedTab = (MongoWriterSelectedTab) tableMapMapper.getSourceTab(); + this.transformerRules = transformerRules; } /** @@ -51,7 +58,7 @@ public MongoDBWriterContext(DataXMongodbWriter writer, IDataxProcessor.TableMap * * @return */ - private static String getDftColumn(ISelectedTab tab) { + private static String getDftColumn(ISelectedTab tab, Optional transformerRules) { //[{"name":"user_id","type":"string"},{"name":"user_name","type":"array","splitter":","}] JSONArray fields = new JSONArray(); @@ -62,12 +69,14 @@ private static String getDftColumn(ISelectedTab tab) { // } // try { - // List selectedTabs = dataReader.getSelectedTabs(); - // if (CollectionUtils.isEmpty(selectedTabs)) { - // return "[]"; - // } - // for (ISelectedTab tab : selectedTabs) { - tab.getCols().forEach((col) -> { + List cols = null; + if (transformerRules.isPresent()) { + cols = transformerRules.get().overwriteCols(tab.getCols()); + } else { + cols = tab.getCols().stream().collect(Collectors.toList()); + } + + cols.forEach((col) -> { JSONObject field = new JSONObject(); field.put("name", col.getName()); field.put("type", col.getType().getCollapse().getLiteria()); @@ -93,7 +102,7 @@ public String getCollectionName() { } public String getColumn() { - return getDftColumn(tableMapMapper.getSourceTab()); + return getDftColumn(tableMapMapper.getSourceTab(), this.transformerRules); } diff --git a/tis-datax/tis-datax-odps-plugin/pom.xml b/tis-datax/tis-datax-odps-plugin/pom.xml index ec00dd9f8..a3b7615e2 100644 --- a/tis-datax/tis-datax-odps-plugin/pom.xml +++ b/tis-datax/tis-datax-odps-plugin/pom.xml @@ -53,6 +53,10 @@ odps-jdbc 3.3.6 + + com.qlangtech.tis.plugins + tis-datax-test-common + diff --git a/tis-datax/tis-datax-odps-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXOdpsWriter.java b/tis-datax/tis-datax-odps-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXOdpsWriter.java index 8d68a0430..c318fc1a1 100644 --- a/tis-datax/tis-datax-odps-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXOdpsWriter.java +++ b/tis-datax/tis-datax-odps-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXOdpsWriter.java @@ -8,6 +8,7 @@ import com.qlangtech.tis.datax.IDataXGenerateCfgs; import com.qlangtech.tis.datax.IDataxContext; import com.qlangtech.tis.datax.IDataxProcessor; +import com.qlangtech.tis.datax.IDataxProcessor.TabCols; import com.qlangtech.tis.datax.IDataxProcessor.TableMap; import com.qlangtech.tis.datax.TimeFormat; import com.qlangtech.tis.datax.impl.DataXCfgGenerator; @@ -193,7 +194,6 @@ public DataflowTask createTask(ISqlTask nodeMeta, boolean isFinalNode, IExecChai } - /** * https://help.aliyun.com/document_detail/73768.html#section-ixi-bgd-948 * @@ -201,9 +201,9 @@ public DataflowTask createTask(ISqlTask nodeMeta, boolean isFinalNode, IExecChai * @return */ @Override - public CreateTableSqlBuilder.CreateDDL generateCreateDDL(IDataxProcessor.TableMap tableMapper,Optional transformers) { + public CreateTableSqlBuilder.CreateDDL generateCreateDDL(IDataxProcessor.TableMap tableMapper, Optional transformers) { final CreateTableSqlBuilder createTableSqlBuilder - = new CreateTableSqlBuilder(tableMapper, this.getDataSourceFactory(),transformers) { + = new CreateTableSqlBuilder(tableMapper, this.getDataSourceFactory(), transformers) { @Override protected CreateTableName getCreateTableName() { @@ -345,12 +345,12 @@ public DataSourceMeta.JDBCConnection getConnection() { } @Override - public IDataxContext getSubTask(Optional tableMap) { + public IDataxContext getSubTask(Optional tableMap, Optional transformerRules) { if (!tableMap.isPresent()) { throw new IllegalArgumentException("param tableMap shall be present"); } - return new OdpsContext(this, tableMap.get()); + return new OdpsContext(this, tableMap.get(), transformerRules); } @Override @@ -369,11 +369,12 @@ public static class OdpsContext implements IDataxContext { private final IDataxProcessor.TableMap tableMapper; private final AccessKey accessKey; private final OdpsDataSourceFactory dsFactory; + private final List cols; - public OdpsContext(DataXOdpsWriter odpsWriter, IDataxProcessor.TableMap tableMapper) { + public OdpsContext(DataXOdpsWriter odpsWriter, IDataxProcessor.TableMap tableMapper, Optional transformerRules) { this.odpsWriter = odpsWriter; this.tableMapper = tableMapper; - + this.cols = TabCols.create(null, tableMapper, transformerRules).getRawCols(); this.dsFactory = odpsWriter.getDataSourceFactory(); this.accessKey = this.dsFactory.getAccessKey(); @@ -388,8 +389,9 @@ public String getProject() { } public List getColumn() { - return this.tableMapper.getSourceCols() - .stream().map((col) -> col.getName()).collect(Collectors.toList()); +// return this.tableMapper.getSourceCols() +// .stream().map((col) -> col.getName()).collect(Collectors.toList()); + return this.cols; } public String getAccessId() { diff --git a/tis-datax/tis-datax-odps-plugin/src/test/java/com/qlangtech/tis/plugin/datax/TestDataXOdpsWriter.java b/tis-datax/tis-datax-odps-plugin/src/test/java/com/qlangtech/tis/plugin/datax/TestDataXOdpsWriter.java index 4a5441f7a..bec9011f0 100644 --- a/tis-datax/tis-datax-odps-plugin/src/test/java/com/qlangtech/tis/plugin/datax/TestDataXOdpsWriter.java +++ b/tis-datax/tis-datax-odps-plugin/src/test/java/com/qlangtech/tis/plugin/datax/TestDataXOdpsWriter.java @@ -9,7 +9,8 @@ import com.qlangtech.tis.offline.FileSystemFactory; import com.qlangtech.tis.plugin.aliyun.AccessKey; import com.qlangtech.tis.plugin.datax.odps.OdpsDataSourceFactory; -import com.qlangtech.tis.plugin.datax.test.TestSelectedTabs; + +import com.qlangtech.tis.plugin.datax.test.TestSelectedTabs; import com.qlangtech.tis.plugin.ds.CMeta; import com.qlangtech.tis.plugin.ds.DataXReaderColType; import com.qlangtech.tis.trigger.util.JsonUtil; @@ -85,7 +86,7 @@ private void validateConfigGenerate(String assertFileName, OdpsDataSourceFactory Optional tableMap = TestSelectedTabs.createTableMapper(); IDataxProcessor.TableMap tab = tableMap.get(); - IDataxContext subTaskCtx = odpsWriter.getSubTask(tableMap); + IDataxContext subTaskCtx = odpsWriter.getSubTask(tableMap, Optional.empty()); Assert.assertNotNull(subTaskCtx); DataXOdpsWriter.OdpsContext odpsWriterContext = (DataXOdpsWriter.OdpsContext) subTaskCtx; @@ -129,7 +130,7 @@ public void testGenerateCreateDDL() { final DataXOdpsWriter writer = createDataXOdpsWriter(); Assert.assertFalse(writer.isGenerateCreateDDLSwitchOff()); - // EasyMock.replay(fsFactory, fs); + // EasyMock.replay(fsFactory, fs); CreateTableSqlBuilder.CreateDDL ddl = writer.generateCreateDDL(getTabApplication((cols) -> { CMeta col = new CMeta(); col.setPk(true); diff --git a/tis-datax/tis-datax-oracle-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXOracleWriter.java b/tis-datax/tis-datax-oracle-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXOracleWriter.java index d5e1e3a7a..304f3bf15 100644 --- a/tis-datax/tis-datax-oracle-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXOracleWriter.java +++ b/tis-datax/tis-datax-oracle-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXOracleWriter.java @@ -50,11 +50,11 @@ public static String getDftTemplate() { } @Override - public IDataxContext getSubTask(Optional tableMap) { + public IDataxContext getSubTask(Optional tableMap, Optional transformerRules) { if (!tableMap.isPresent()) { throw new IllegalStateException("tableMap must be present"); } - OracleWriterContext writerContext = new OracleWriterContext(this, tableMap.get()); + OracleWriterContext writerContext = new OracleWriterContext(this, tableMap.get(), transformerRules); return writerContext; } diff --git a/tis-datax/tis-datax-oracle-plugin/src/main/java/com/qlangtech/tis/plugin/datax/OracleWriterContext.java b/tis-datax/tis-datax-oracle-plugin/src/main/java/com/qlangtech/tis/plugin/datax/OracleWriterContext.java index dbceb2456..a56eaf583 100644 --- a/tis-datax/tis-datax-oracle-plugin/src/main/java/com/qlangtech/tis/plugin/datax/OracleWriterContext.java +++ b/tis-datax/tis-datax-oracle-plugin/src/main/java/com/qlangtech/tis/plugin/datax/OracleWriterContext.java @@ -1,19 +1,19 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package com.qlangtech.tis.plugin.datax; @@ -21,17 +21,20 @@ import com.qlangtech.tis.datax.IDataxProcessor; import com.qlangtech.tis.plugin.datax.common.RdbmsReaderContext; import com.qlangtech.tis.plugin.datax.common.RdbmsWriterContext; +import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules; import com.qlangtech.tis.plugin.ds.IDataSourceDumper; import com.qlangtech.tis.plugin.ds.oracle.OracleDataSourceFactory; import org.apache.commons.lang.StringUtils; +import java.util.Optional; + /** * @author: 百岁(baisui@qlangtech.com) * @create: 2021-06-24 15:41 **/ public class OracleWriterContext extends RdbmsWriterContext { - public OracleWriterContext(DataXOracleWriter writer, IDataxProcessor.TableMap tabMapper) { - super(writer, tabMapper); + public OracleWriterContext(DataXOracleWriter writer, IDataxProcessor.TableMap tabMapper, Optional transformerRules) { + super(writer, tabMapper, transformerRules); } @Override diff --git a/tis-datax/tis-datax-postgresql-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXPostgresqlWriter.java b/tis-datax/tis-datax-postgresql-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXPostgresqlWriter.java index 80c55da7f..4a2d849c0 100644 --- a/tis-datax/tis-datax-postgresql-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXPostgresqlWriter.java +++ b/tis-datax/tis-datax-postgresql-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXPostgresqlWriter.java @@ -180,8 +180,8 @@ public String smallIntType(DataType dataType) { } @Override - public IDataxContext getSubTask(Optional tableMap) { - PostgreWriterContext writerContext = new PostgreWriterContext(this, tableMap.get()); + public IDataxContext getSubTask(Optional tableMap, Optional transformerRules) { + PostgreWriterContext writerContext = new PostgreWriterContext(this, tableMap.get(), transformerRules); return writerContext; } diff --git a/tis-datax/tis-datax-postgresql-plugin/src/main/java/com/qlangtech/tis/plugin/datax/PostgreWriterContext.java b/tis-datax/tis-datax-postgresql-plugin/src/main/java/com/qlangtech/tis/plugin/datax/PostgreWriterContext.java index febd09f49..c46a49a4d 100644 --- a/tis-datax/tis-datax-postgresql-plugin/src/main/java/com/qlangtech/tis/plugin/datax/PostgreWriterContext.java +++ b/tis-datax/tis-datax-postgresql-plugin/src/main/java/com/qlangtech/tis/plugin/datax/PostgreWriterContext.java @@ -20,17 +20,20 @@ import com.qlangtech.tis.datax.IDataxProcessor; import com.qlangtech.tis.plugin.datax.common.RdbmsWriterContext; +import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules; import com.qlangtech.tis.plugin.ds.postgresql.PGDataSourceFactory; import org.apache.commons.lang.StringUtils; +import java.util.Optional; + /** * @author: 百岁(baisui@qlangtech.com) * @create: 2021-06-23 15:43 **/ public class PostgreWriterContext extends RdbmsWriterContext { - public PostgreWriterContext(DataXPostgresqlWriter writer, IDataxProcessor.TableMap tabMapper) { - super(writer, tabMapper); + public PostgreWriterContext(DataXPostgresqlWriter writer, IDataxProcessor.TableMap tabMapper, Optional transformerRules) { + super(writer, tabMapper, transformerRules); } @Override diff --git a/tis-datax/tis-datax-sqlserver-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXSqlserverWriter.java b/tis-datax/tis-datax-sqlserver-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXSqlserverWriter.java index 6918ee6ca..5c2aca067 100644 --- a/tis-datax/tis-datax-sqlserver-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXSqlserverWriter.java +++ b/tis-datax/tis-datax-sqlserver-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXSqlserverWriter.java @@ -51,23 +51,23 @@ public static String getDftTemplate() { // } @Override - public IDataxContext getSubTask(Optional tableMap) { + public IDataxContext getSubTask(Optional tableMap, Optional transformerRules) { if (!tableMap.isPresent()) { throw new IllegalArgumentException("param tableMap shall be present"); } - SqlServerWriterContext writerContext = new SqlServerWriterContext(this, tableMap.get()); + SqlServerWriterContext writerContext = new SqlServerWriterContext(this, tableMap.get(), transformerRules); return writerContext; } @Override public CreateTableSqlBuilder.CreateDDL generateCreateDDL( - IDataxProcessor.TableMap tableMapper,Optional transformers) { + IDataxProcessor.TableMap tableMapper, Optional transformers) { // if (!this.autoCreateTable) { // return null; // } // https://www.cnblogs.com/mingfei200169/articles/427591.html - final CreateTableSqlBuilder createTableSqlBuilder = new CreateTableSqlBuilder(tableMapper, this.getDataSourceFactory(),transformers) { + final CreateTableSqlBuilder createTableSqlBuilder = new CreateTableSqlBuilder(tableMapper, this.getDataSourceFactory(), transformers) { private String convertType(IColMetaGetter col) { //https://www.cnblogs.com/liberty777/p/10748570.html diff --git a/tis-datax/tis-datax-sqlserver-plugin/src/main/java/com/qlangtech/tis/plugin/datax/SqlServerWriterContext.java b/tis-datax/tis-datax-sqlserver-plugin/src/main/java/com/qlangtech/tis/plugin/datax/SqlServerWriterContext.java index 5421cc51d..5d3b36cd2 100644 --- a/tis-datax/tis-datax-sqlserver-plugin/src/main/java/com/qlangtech/tis/plugin/datax/SqlServerWriterContext.java +++ b/tis-datax/tis-datax-sqlserver-plugin/src/main/java/com/qlangtech/tis/plugin/datax/SqlServerWriterContext.java @@ -20,16 +20,19 @@ import com.qlangtech.tis.datax.IDataxProcessor; import com.qlangtech.tis.plugin.datax.common.RdbmsWriterContext; +import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules; import com.qlangtech.tis.plugin.ds.sqlserver.SqlServerDatasourceFactory; +import java.util.Optional; + /** * @author: 百岁(baisui@qlangtech.com) * @create: 2021-06-24 10:46 **/ public class SqlServerWriterContext extends RdbmsWriterContext { public static final String EscapeChar = "\\\""; - public SqlServerWriterContext(DataXSqlserverWriter writer, IDataxProcessor.TableMap tabMapper) { - super(writer, tabMapper); + public SqlServerWriterContext(DataXSqlserverWriter writer, IDataxProcessor.TableMap tabMapper, Optional transformerRules) { + super(writer, tabMapper,transformerRules); } @Override diff --git a/tis-datax/tis-datax-starrocks-plugin/src/main/java/com/qlangtech/tis/plugin/datax/BasicStarRocksWriter.java b/tis-datax/tis-datax-starrocks-plugin/src/main/java/com/qlangtech/tis/plugin/datax/BasicStarRocksWriter.java index 643e50617..eb28e2e5a 100644 --- a/tis-datax/tis-datax-starrocks-plugin/src/main/java/com/qlangtech/tis/plugin/datax/BasicStarRocksWriter.java +++ b/tis-datax/tis-datax-starrocks-plugin/src/main/java/com/qlangtech/tis/plugin/datax/BasicStarRocksWriter.java @@ -56,11 +56,11 @@ public abstract class BasicStarRocksWriter extends BasicDataXRdbmsWriter tableMap) { + public IDataxContext getSubTask(Optional tableMap, Optional transformerRules) { if (!tableMap.isPresent()) { throw new IllegalStateException("tableMap must be present"); } - return new StarRocksWriterContext(this, tableMap.get()); + return new StarRocksWriterContext(this, tableMap.get(), transformerRules); } /** diff --git a/tis-datax/tis-datax-starrocks-plugin/src/main/java/com/qlangtech/tis/plugin/datax/starrocks/StarRocksWriterContext.java b/tis-datax/tis-datax-starrocks-plugin/src/main/java/com/qlangtech/tis/plugin/datax/starrocks/StarRocksWriterContext.java index 1ab0ed6f6..f0b5b7e52 100644 --- a/tis-datax/tis-datax-starrocks-plugin/src/main/java/com/qlangtech/tis/plugin/datax/starrocks/StarRocksWriterContext.java +++ b/tis-datax/tis-datax-starrocks-plugin/src/main/java/com/qlangtech/tis/plugin/datax/starrocks/StarRocksWriterContext.java @@ -21,17 +21,21 @@ import com.qlangtech.tis.datax.IDataxProcessor; import com.qlangtech.tis.plugin.datax.BasicStarRocksWriter; import com.qlangtech.tis.plugin.datax.common.RdbmsWriterContext; +import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules; import com.qlangtech.tis.plugin.ds.starrocks.StarRocksSourceFactory; import org.apache.commons.lang.StringUtils; +import java.util.Optional; + /** * @author: 百岁(baisui@qlangtech.com) * @create: 2021-09-07 09:58 **/ public class StarRocksWriterContext extends RdbmsWriterContext { - public StarRocksWriterContext(BasicStarRocksWriter writer, IDataxProcessor.TableMap tabMapper) { - super(writer, tabMapper); + public StarRocksWriterContext(BasicStarRocksWriter writer + , IDataxProcessor.TableMap tabMapper, Optional transformerRules) { + super(writer, tabMapper, transformerRules); } public String getDataXName() { diff --git a/tis-datax/tis-datax-test-common/src/main/java/com/qlangtech/tis/plugin/common/BasicTemplate.java b/tis-datax/tis-datax-test-common/src/main/java/com/qlangtech/tis/plugin/common/BasicTemplate.java index 2a370c8e7..e05facc4c 100644 --- a/tis-datax/tis-datax-test-common/src/main/java/com/qlangtech/tis/plugin/common/BasicTemplate.java +++ b/tis-datax/tis-datax-test-common/src/main/java/com/qlangtech/tis/plugin/common/BasicTemplate.java @@ -8,6 +8,7 @@ import com.qlangtech.tis.datax.TableAliasMapper; import com.qlangtech.tis.datax.impl.DataXCfgGenerator; import com.qlangtech.tis.plugin.StoreResourceType; +import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules; import com.qlangtech.tis.plugin.trigger.JobTrigger; import com.qlangtech.tis.util.IPluginContext; import org.apache.commons.lang.StringUtils; @@ -127,7 +128,7 @@ public String identityValue() { } }) { @Override - protected String getTemplateContent(IDataxReaderContext readerContext, IDataxReader reader, IDataxWriter writer) { + protected String getTemplateContent(IDataxReaderContext readerContext, IDataxReader reader, IDataxWriter writer, RecordTransformerRules transformerRules) { return vmTplContent; } }; diff --git a/tis-datax/tis-datax-test-common/src/main/java/com/qlangtech/tis/plugin/common/ReaderTemplate.java b/tis-datax/tis-datax-test-common/src/main/java/com/qlangtech/tis/plugin/common/ReaderTemplate.java index f94495618..92823d1bf 100644 --- a/tis-datax/tis-datax-test-common/src/main/java/com/qlangtech/tis/plugin/common/ReaderTemplate.java +++ b/tis-datax/tis-datax-test-common/src/main/java/com/qlangtech/tis/plugin/common/ReaderTemplate.java @@ -35,6 +35,7 @@ import com.qlangtech.tis.extension.impl.IOUtils; import com.qlangtech.tis.plugin.StoreResourceType; import com.qlangtech.tis.plugin.datax.MockDataxReaderContext; +import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules; import junit.framework.TestCase; import org.apache.commons.lang3.tuple.Pair; import org.easymock.EasyMock; @@ -61,7 +62,7 @@ public static void validateDataXReader(String assertFileName, String dataXName, IDataxWriter dataxWriter = EasyMock.mock("dataxWriter", IDataxWriter.class); // EasyMock.expect(processor.getWriter(null, true)).andReturn(dataxWriter).anyTimes(); IDataxContext dataxContext = EasyMock.mock("dataxWriterContext", IDataxContext.class); - EasyMock.expect(dataxWriter.getSubTask(Optional.empty())).andReturn(dataxContext).anyTimes(); + EasyMock.expect(dataxWriter.getSubTask(Optional.empty(), Optional.empty())).andReturn(dataxContext).anyTimes(); // EasyMock.expect(processor.getReader(null)).andReturn(dataxReader); @@ -107,7 +108,7 @@ public DataxWriter.BaseDataxWriterDescriptor getWriterDescriptor() { } @Override - public IDataxContext getSubTask(Optional tableMap) { + public IDataxContext getSubTask(Optional tableMap, Optional transformerRules) { return null; } }; diff --git a/tis-datax/tis-datax-test-common/src/main/java/com/qlangtech/tis/plugin/common/WriterTemplate.java b/tis-datax/tis-datax-test-common/src/main/java/com/qlangtech/tis/plugin/common/WriterTemplate.java index c65bcd95b..d472e7ffa 100644 --- a/tis-datax/tis-datax-test-common/src/main/java/com/qlangtech/tis/plugin/common/WriterTemplate.java +++ b/tis-datax/tis-datax-test-common/src/main/java/com/qlangtech/tis/plugin/common/WriterTemplate.java @@ -34,6 +34,7 @@ import com.qlangtech.tis.extension.impl.IOUtils; import com.qlangtech.tis.plugin.StoreResourceType; import com.qlangtech.tis.plugin.datax.MockDataxReaderContext; +import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules; import com.qlangtech.tis.plugin.ds.CMeta; import com.qlangtech.tis.plugin.ds.DataXReaderColType; import com.qlangtech.tis.plugin.ds.ISelectedTab; @@ -168,7 +169,7 @@ private static String generateWriterCfg(DataxWriter dataXWriter MockDataxReaderContext mockReaderContext = new MockDataxReaderContext(); DataXCfgGenerator dataProcessor = new DataXCfgGenerator(null, BasicTest.testDataXName, processor) { @Override - protected String getTemplateContent(IDataxReaderContext readerContext,IDataxReader reader, IDataxWriter writer) { + protected String getTemplateContent(IDataxReaderContext readerContext, IDataxReader reader, IDataxWriter writer, RecordTransformerRules transformerRules) { return dataXWriter.getTemplate(); } // @Override diff --git a/tis-datax/tis-ds-mysql-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataxMySQLWriter.java b/tis-datax/tis-ds-mysql-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataxMySQLWriter.java index 3c06e83d0..f5f63ccc6 100644 --- a/tis-datax/tis-ds-mysql-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataxMySQLWriter.java +++ b/tis-datax/tis-ds-mysql-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataxMySQLWriter.java @@ -82,7 +82,7 @@ public WriteMode getWriteMode() { } @Override - public IDataxContext getSubTask(Optional tableMap) { + public IDataxContext getSubTask(Optional tableMap, Optional transformerRules) { if (!tableMap.isPresent()) { throw new IllegalArgumentException("param tableMap shall be present"); } @@ -105,7 +105,7 @@ public IDataxContext getSubTask(Optional tableMap) { context.password = dsFactory.password; context.username = dsFactory.userName; context.tabName = table.getTableName(); - context.cols = IDataxProcessor.TabCols.create(dsFactory, tm); + context.cols = IDataxProcessor.TabCols.create(dsFactory, tm, transformerRules); context.dbName = this.dbName; context.writeMode = this.writeMode; context.preSql = this.preSql; @@ -126,9 +126,10 @@ public CreateTableSqlBuilder.CreateDDL generateCreateDDL(IDataxProcessor.TableMa DataxReader threadBingDataXReader = DataxReader.getThreadBingDataXReader(); Objects.requireNonNull(threadBingDataXReader, "getThreadBingDataXReader can not be null"); try { - if (threadBingDataXReader instanceof DataxMySQLReader + if (threadBingDataXReader instanceof DataxMySQLReader // // 没有使用别名 - && tableMapper.hasNotUseAlias() && !transformers.isPresent()) { + && tableMapper.hasNotUseAlias() // + && !transformers.isPresent()) { DataxMySQLReader mySQLReader = (DataxMySQLReader) threadBingDataXReader; MySQLDataSourceFactory dsFactory = mySQLReader.getDataSourceFactory(); dsFactory.visitFirstConnection((c) -> { diff --git a/tis-datax/tis-ds-mysql-plugin/src/test/java/com/qlangtech/tis/plugin/datax/TestDataxMySQLWriter.java b/tis-datax/tis-ds-mysql-plugin/src/test/java/com/qlangtech/tis/plugin/datax/TestDataxMySQLWriter.java index d7066fd7e..d969b4875 100644 --- a/tis-datax/tis-ds-mysql-plugin/src/test/java/com/qlangtech/tis/plugin/datax/TestDataxMySQLWriter.java +++ b/tis-datax/tis-ds-mysql-plugin/src/test/java/com/qlangtech/tis/plugin/datax/TestDataxMySQLWriter.java @@ -224,7 +224,7 @@ public void testTempateGenerate() throws Exception { private void validateConfigGenerate(String assertFileName, DataxMySQLWriter mySQLWriter) throws IOException { Optional tableMap = TestSelectedTabs.createTableMapper(); - IDataxContext subTaskCtx = mySQLWriter.getSubTask(tableMap); + IDataxContext subTaskCtx = mySQLWriter.getSubTask(tableMap, Optional.empty()); Assert.assertNotNull(subTaskCtx); RdbmsDataxContext mySQLDataxContext = (RdbmsDataxContext) subTaskCtx; diff --git a/tis-datax/tis-hive-flat-table-builder-plugin/src/main/resources/com/qlangtech/tis/hive/reader/DataXHiveReader.selectedTabs.json b/tis-datax/tis-hive-flat-table-builder-plugin/src/main/resources/com/qlangtech/tis/hive/reader/DataXHiveReader.selectedTabs.json index 30ef44e92..25b2e4486 100644 --- a/tis-datax/tis-hive-flat-table-builder-plugin/src/main/resources/com/qlangtech/tis/hive/reader/DataXHiveReader.selectedTabs.json +++ b/tis-datax/tis-hive-flat-table-builder-plugin/src/main/resources/com/qlangtech/tis/hive/reader/DataXHiveReader.selectedTabs.json @@ -1,5 +1,5 @@ { "cols": { - "viewtype": "tuplelist" + "viewtype": "idlist" } } diff --git a/tis-incr/tis-realtime-flink/src/main/java/com/qlangtech/tis/realtime/BasicTISSinkFactory.java b/tis-incr/tis-realtime-flink/src/main/java/com/qlangtech/tis/realtime/BasicTISSinkFactory.java index c049937e5..3c8b68983 100644 --- a/tis-incr/tis-realtime-flink/src/main/java/com/qlangtech/tis/realtime/BasicTISSinkFactory.java +++ b/tis-incr/tis-realtime-flink/src/main/java/com/qlangtech/tis/realtime/BasicTISSinkFactory.java @@ -30,6 +30,7 @@ import com.qlangtech.tis.plugins.incr.flink.cdc.impl.RowDataTransformerMapper; import com.qlangtech.tis.realtime.dto.DTOStream; import com.qlangtech.tis.realtime.transfer.DTO; +import com.qlangtech.tis.util.IPluginContext; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.table.connector.sink.SinkFunctionProvider; @@ -105,7 +106,7 @@ public RowDataSinkFunc(IDataXNameAware dataXName, TableAlias tab , boolean supportUpset, int sinkTaskParallelism, IFlinkColCreator flinkColCreator) { super(tab, primaryKeys, sinkFunction, sourceColsMeta, sinkColsMeta, sinkTaskParallelism); this.transformers - = Optional.ofNullable(RecordTransformerRules.loadTransformerRules(dataXName, tab.getFrom())); + = Optional.ofNullable(RecordTransformerRules.loadTransformerRules(IPluginContext.namedContext(dataXName.getTISDataXName()), tab.getFrom())); this.flinkColCreator = Objects.requireNonNull(flinkColCreator, "flinkColCreator can not be null"); if (supportUpset) { this.setSourceFilter("skipUpdateBeforeEvent" diff --git a/tis-incr/tis-realtime-flink/src/main/java/com/qlangtech/tis/realtime/TableRegisterFlinkSourceHandle.java b/tis-incr/tis-realtime-flink/src/main/java/com/qlangtech/tis/realtime/TableRegisterFlinkSourceHandle.java index 962d1e247..8bacbcc97 100644 --- a/tis-incr/tis-realtime-flink/src/main/java/com/qlangtech/tis/realtime/TableRegisterFlinkSourceHandle.java +++ b/tis-incr/tis-realtime-flink/src/main/java/com/qlangtech/tis/realtime/TableRegisterFlinkSourceHandle.java @@ -42,6 +42,7 @@ import com.qlangtech.tis.realtime.dto.DTOStream; import com.qlangtech.tis.realtime.transfer.DTO; import com.qlangtech.tis.sql.parser.tuple.creator.IStreamIncrGenerateStrategy; +import com.qlangtech.tis.util.IPluginContext; import org.apache.commons.lang3.tuple.Pair; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -208,7 +209,7 @@ protected void registerSourceTable(StreamTableEnvironment tabEnv 添加transformer执行逻辑 */ Optional transformers - = Optional.ofNullable(RecordTransformerRules.loadTransformerRules(this, tabName)); + = Optional.ofNullable(RecordTransformerRules.loadTransformerRules(IPluginContext.namedContext(this.getCollectionName()), tabName)); RecordTransformerRules tRules = null; RowTransformerMapper transformerMapper = null; List cols = null; diff --git a/tis-k8s-plugin/src/main/java/com/qlangtech/tis/plugin/datax/powerjob/impl/serverport/NodePort.java b/tis-k8s-plugin/src/main/java/com/qlangtech/tis/plugin/datax/powerjob/impl/serverport/NodePort.java index 0d4bc5048..5336c47c1 100644 --- a/tis-k8s-plugin/src/main/java/com/qlangtech/tis/plugin/datax/powerjob/impl/serverport/NodePort.java +++ b/tis-k8s-plugin/src/main/java/com/qlangtech/tis/plugin/datax/powerjob/impl/serverport/NodePort.java @@ -53,7 +53,7 @@ public class NodePort extends ServerPortExport { @Override protected Integer getExportPort() { - return this.nodePort; + return this.serverPort; } @Override diff --git a/tis-k8s-plugin/src/main/java/com/qlangtech/tis/plugin/k8s/K8SUtils.java b/tis-k8s-plugin/src/main/java/com/qlangtech/tis/plugin/k8s/K8SUtils.java index f3c3a24ca..b3752bd94 100644 --- a/tis-k8s-plugin/src/main/java/com/qlangtech/tis/plugin/k8s/K8SUtils.java +++ b/tis-k8s-plugin/src/main/java/com/qlangtech/tis/plugin/k8s/K8SUtils.java @@ -167,16 +167,13 @@ public static ServiceResName createService(final CoreV1Api api, String namespace V1ObjectMeta meta = new V1ObjectMeta(); meta.setName(svcRes.getName()); - // V1OwnerReference ownerRef = createOwnerReference(); List ownerRefs = Collections.singletonList(ownerRef.orElseGet(() -> createOwnerReference())); meta.setOwnerReferences(ownerRefs); svcBody.setMetadata(meta); - // V1ServiceSpec svcSpec = specCreator.get().getKey();// new V1ServiceSpec(); - //svcSpec.setType("ClusterIP"); svcSpec.setSelector(Collections.singletonMap(K8SUtils.LABEL_APP, selector.getK8SResName())); - // V1ServicePort svcPort = specCreator.get().getRight();// new V1ServicePort(); + svcPort.setName(targetPortName); svcPort.setTargetPort(targetPort); svcPort.setPort(exportPort); @@ -207,7 +204,7 @@ public static V1OwnerReference createOwnerReference(V1ReplicationController rc) return createOwnerReference(metadata.getUid(), metadata.getName()); } - private static V1OwnerReference createOwnerReference(String ownerId, String ownerName) { + public static V1OwnerReference createOwnerReference(String ownerId, String ownerName) { V1OwnerReference ownerRef = new V1OwnerReference(); ownerRef.setUid(ownerId); ownerRef.setName(ownerName); @@ -591,10 +588,10 @@ public static WaitReplicaControllerLaunch waitReplicaControllerLaunch(DefaultK8S * 在Waiting等待过程中是否要获取已有的Pods,在scala pods避免要出现刚添加的pod,随即马上去掉,此时程序识别成又添加了一个pod的情况 */ V1PodList pods = targetResName.setFieldSelector( - api - .listNamespacedPod(powerjobServerImage.getNamespace()) - .resourceVersion(resourceVer.getPreListPodsResourceVersion()) - ) + api + .listNamespacedPod(powerjobServerImage.getNamespace()) + .resourceVersion(resourceVer.getPreListPodsResourceVersion()) + ) .execute(); for (V1Pod pod : pods.getItems()) { @@ -626,10 +623,10 @@ public static WaitReplicaControllerLaunch waitReplicaControllerLaunch(DefaultK8S // , targetResName.setFieldSelector( - api.listNamespacedPod(powerjobServerImage.getNamespace()) - .allowWatchBookmarks(false) - .watch(true) - .resourceVersion(currentResVer)) + api.listNamespacedPod(powerjobServerImage.getNamespace()) + .allowWatchBookmarks(false) + .watch(true) + .resourceVersion(currentResVer)) .buildCall(K8SUtils.createApiCallback()) // diff --git a/tis-k8s-plugin/src/test/java/com/qlangtech/tis/plugin/datax/powerjob/impl/serverport/TestNodePort.java b/tis-k8s-plugin/src/test/java/com/qlangtech/tis/plugin/datax/powerjob/impl/serverport/TestNodePort.java new file mode 100644 index 000000000..6d108a0dc --- /dev/null +++ b/tis-k8s-plugin/src/test/java/com/qlangtech/tis/plugin/datax/powerjob/impl/serverport/TestNodePort.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.qlangtech.tis.plugin.datax.powerjob.impl.serverport; + +import com.qlangtech.tis.coredefine.module.action.TargetResName; +import com.qlangtech.tis.datax.job.SSERunnable; +import com.qlangtech.tis.datax.job.ServiceResName; +import com.qlangtech.tis.plugin.k8s.K8SUtils; +import io.kubernetes.client.openapi.apis.CoreV1Api; +import io.kubernetes.client.openapi.models.V1OwnerReference; +import junit.framework.TestCase; +import org.apache.commons.lang3.tuple.Pair; +import org.easymock.EasyMock; + +import java.util.Optional; + +import static com.qlangtech.tis.plugin.datax.powerjob.K8SDataXPowerJobServer.K8S_DATAX_POWERJOB_SERVER_NODE_PORT_SERVICE; + +/** + * @author: 百岁(baisui@qlangtech.com) + * @create: 2024-06-26 09:25 + **/ +public class TestNodePort extends TestCase { + @Override + protected void setUp() throws Exception { + super.setUp(); + SSERunnable.setLocalThread(SSERunnable.createMock()); + } + + public void testExportPort() throws Exception { + NodePort nodePort = new NodePort(); + nodePort.serverPort = 7700; + nodePort.host = "192.168.203.175"; + nodePort.nodePort = 31000; + // K8S_DATAX_POWERJOB_SERVER_NODE_PORT_SERVICE + + CoreV1Api api = EasyMock.mock("api", CoreV1Api.class); + +// String nameSpace, CoreV1Api api, String targetPortName +// , Pair serviceResAndOwner, Optional ownerRef + + TargetResName resOwner = new TargetResName("test"); + + EasyMock.replay(api); + + V1OwnerReference ownerReference = K8SUtils.createOwnerReference("testOwnerId", "testOwnerName"); + + nodePort.exportPort("tis", api, "targetPortName" + , Pair.of(K8S_DATAX_POWERJOB_SERVER_NODE_PORT_SERVICE, resOwner), Optional.of(ownerReference)); + + EasyMock.verify(api); + } +} diff --git a/tis-transformer/src/main/java/com/qlangtech/tis/plugin/datax/transformer/impl/AbstractFromColumnUDFDefinition.java b/tis-transformer/src/main/java/com/qlangtech/tis/plugin/datax/transformer/impl/AbstractFromColumnUDFDefinition.java index bc8c7e011..10f68ed23 100644 --- a/tis-transformer/src/main/java/com/qlangtech/tis/plugin/datax/transformer/impl/AbstractFromColumnUDFDefinition.java +++ b/tis-transformer/src/main/java/com/qlangtech/tis/plugin/datax/transformer/impl/AbstractFromColumnUDFDefinition.java @@ -19,17 +19,13 @@ package com.qlangtech.tis.plugin.datax.transformer.impl; import com.google.common.collect.Lists; -import com.qlangtech.tis.plugin.IdentityName; import com.qlangtech.tis.plugin.annotation.FormField; import com.qlangtech.tis.plugin.annotation.FormFieldType; import com.qlangtech.tis.plugin.annotation.Validator; -import com.qlangtech.tis.plugin.datax.SelectedTab; import com.qlangtech.tis.plugin.datax.transformer.UDFDefinition; import com.qlangtech.tis.plugin.datax.transformer.UDFDesc; -import com.qlangtech.tis.plugin.ds.CMeta; import java.util.List; -import java.util.stream.Collectors; /** * @@ -45,9 +41,6 @@ public List getLiteria() { } - public static List colsCandidate() { - List colsCandidate = SelectedTab.getColsCandidate(); - return colsCandidate.stream().collect(Collectors.toList()); - } + } diff --git a/tis-transformer/src/main/java/com/qlangtech/tis/plugin/datax/transformer/impl/ConcatUDF.java b/tis-transformer/src/main/java/com/qlangtech/tis/plugin/datax/transformer/impl/ConcatUDF.java index d411151e6..c30003ef4 100644 --- a/tis-transformer/src/main/java/com/qlangtech/tis/plugin/datax/transformer/impl/ConcatUDF.java +++ b/tis-transformer/src/main/java/com/qlangtech/tis/plugin/datax/transformer/impl/ConcatUDF.java @@ -56,6 +56,7 @@ public class ConcatUDF extends UDFDefinition { /** * 取得可用的字段分隔符 + * * @return */ public static List