Skip to content

Commit

Permalink
add transformer for DataX config file writer cols append with transfo…
Browse files Browse the repository at this point in the history
…rmer cols
  • Loading branch information
baisui1981 committed Jun 26, 2024
1 parent 6823bfc commit ac76375
Show file tree
Hide file tree
Showing 43 changed files with 280 additions and 194 deletions.
Original file line number Diff line number Diff line change
@@ -1,33 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.qlangtech.tis.plugin.datax;

import com.google.common.collect.Lists;
import com.qlangtech.tis.datax.IDataxContext;
import com.qlangtech.tis.datax.IDataxProcessor;
import com.qlangtech.tis.plugin.ds.CMeta;
import com.qlangtech.tis.plugin.ds.ISelectedTab;
import com.qlangtech.tis.plugin.ds.ISelectedTab;
import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules;
import com.qlangtech.tis.plugin.ds.IColMetaGetter;
import com.qlangtech.tis.plugin.ds.cassandra.CassandraDatasourceFactory;
import org.apache.commons.lang.StringUtils;

import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

/**
Expand All @@ -38,11 +38,15 @@ public class CassandraWriterContext implements IDataxContext {
private final DataXCassandraWriter writer;
private final IDataxProcessor.TableMap tabMapper;
private final CassandraDatasourceFactory dsFactory;
private final List<IColMetaGetter> cols;

public CassandraWriterContext(DataXCassandraWriter writer, IDataxProcessor.TableMap tabMapper) {
public CassandraWriterContext(DataXCassandraWriter writer, IDataxProcessor.TableMap tabMapper, Optional<RecordTransformerRules> transformerRules) {
this.writer = writer;
this.tabMapper = tabMapper;
this.dsFactory = writer.getDataSourceFactory();

this.cols = transformerRules.map((rule) -> rule.overwriteCols(tabMapper.getSourceCols()))
.orElseGet(() -> tabMapper.getSourceCols().stream().collect(Collectors.toList()));
}

public String getKeyspace() {
Expand All @@ -61,8 +65,9 @@ public String getTable() {
return this.tabMapper.getTo();
}

public List<CMeta> getColumn() {
return this.tabMapper.getSourceCols();
public List<IColMetaGetter> getColumn() {
return cols;
// return this.tabMapper.getSourceCols();
}

public String getHost() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.qlangtech.tis.plugin.annotation.FormField;
import com.qlangtech.tis.plugin.annotation.FormFieldType;
import com.qlangtech.tis.plugin.annotation.Validator;
import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules;
import com.qlangtech.tis.plugin.ds.PostedDSProp;
import com.qlangtech.tis.plugin.ds.cassandra.CassandraDatasourceFactory;

Expand Down Expand Up @@ -84,11 +85,11 @@ public String getTemplate() {
}

@Override
public IDataxContext getSubTask(Optional<IDataxProcessor.TableMap> tableMap) {
public IDataxContext getSubTask(Optional<IDataxProcessor.TableMap> tableMap, Optional<RecordTransformerRules> transformerRules) {
if (!tableMap.isPresent()) {
throw new IllegalArgumentException("param tableMap shall be present");
}
return new CassandraWriterContext(this, tableMap.get());
return new CassandraWriterContext(this, tableMap.get(),transformerRules);
}

public CassandraDatasourceFactory getDataSourceFactory() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ public ClickHouseDataSourceFactory getDataSourceFactory() {
}

@Override
public IDataxContext getSubTask(Optional<IDataxProcessor.TableMap> tableMap) {
public IDataxContext getSubTask(Optional<IDataxProcessor.TableMap> tableMap, Optional<RecordTransformerRules> transformerRules) {
if (!tableMap.isPresent()) {
throw new IllegalArgumentException("tableMap shall be present");
}
Expand Down Expand Up @@ -228,7 +228,7 @@ public IDataxContext getSubTask(Optional<IDataxProcessor.TableMap> tableMap) {
// context.setPostSql(helper.replacePlaceholders(this.postSql, resolver));
context.setPostSql(this.postSql);
}
context.setCols(IDataxProcessor.TabCols.create(ds, tableMap.get()));
context.setCols(IDataxProcessor.TabCols.create(ds, tableMap.get(), transformerRules));
return context;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package com.qlangtech.tis.plugin.datax;

import com.google.common.collect.Lists;
import com.qlangtech.tis.datax.DataXCfgFile;
import com.qlangtech.tis.datax.IDataxProcessor;
import com.qlangtech.tis.datax.impl.DataxProcessor;
import com.qlangtech.tis.datax.impl.DataxWriter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.qlangtech.tis.runtime.module.misc.IControlMsgHandler;
import com.qlangtech.tis.runtime.module.misc.IFieldErrorHandler;
import com.qlangtech.tis.sql.parser.tuple.creator.EntityName;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -92,12 +93,19 @@ public final List<SelectedTab> getSelectedTabs() {
if (this.preSelectedTabsHash == selectedTabs.hashCode()) {
return selectedTabs;
}
boolean shallFillSelectedTabMeta = shallFillSelectedTabMeta();
this.selectedTabs = fillSelectedTabMeta(this.selectedTabs);
this.preSelectedTabsHash = selectedTabs.hashCode();
return this.selectedTabs;

}

@Override
public List<SelectedTab> fillSelectedTabMeta(List<SelectedTab> tabs) {
boolean shallFillSelectedTabMeta = shallFillSelectedTabMeta();

if (shallFillSelectedTabMeta) {
try (TableColsMeta tabsMeta = getTabsMeta()) {
this.selectedTabs = this.selectedTabs.stream().map((tab) -> {
return tabs.stream().map((tab) -> {
ColumnMetaData.fillSelectedTabMeta(tab, (t) -> {
return tabsMeta.get(t.getName());
});
Expand All @@ -107,12 +115,13 @@ public final List<SelectedTab> getSelectedTabs() {
throw new RuntimeException(e);
}
}
this.preSelectedTabsHash = selectedTabs.hashCode();
return this.selectedTabs;

return tabs;
}

protected boolean shallFillSelectedTabMeta() {
if (CollectionUtils.isEmpty(this.selectedTabs)) {
return true;
}
for (SelectedTab tab : this.selectedTabs) {
for (CMeta c : tab.cols) {
return (c.getType() == null);
Expand All @@ -123,6 +132,7 @@ protected boolean shallFillSelectedTabMeta() {

protected abstract RdbmsReaderContext createDataXReaderContext(String jobName, SelectedTab tab,
IDataSourceDumper dumper);

@Override
public void setKey(KeyedPluginStore.Key key) {
this.dataXName = key.keyVal.getVal();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import com.qlangtech.tis.datax.IDataxContext;
import com.qlangtech.tis.datax.IDataxProcessor;
import com.qlangtech.tis.datax.IDataxProcessor.TabCols;
import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules;
import com.qlangtech.tis.plugin.ds.BasicDataSourceFactory;
import org.apache.commons.lang.StringUtils;

Expand All @@ -35,10 +37,12 @@ public abstract class RdbmsWriterContext<WRITER extends BasicDataXRdbmsWriter, D
extends BasicRdbmsContext<WRITER, DS> implements IDataxContext {
private final String tableName;

public RdbmsWriterContext(WRITER writer, IDataxProcessor.TableMap tabMapper) {
public RdbmsWriterContext(WRITER writer, IDataxProcessor.TableMap tabMapper, Optional< RecordTransformerRules> transformerRules) {
super(writer, (DS) writer.getDataSourceFactory());
this.tableName = tabMapper.getTo();
this.setCols(tabMapper.getSourceCols().stream().map((c) -> c.getName()).collect(Collectors.toList()));
TabCols tabCols = TabCols.create(dsFactory, tabMapper, transformerRules);
// this.setCols(tabMapper.getSourceCols().stream().map((c) -> c.getName()).collect(Collectors.toList()));
this.setCols(tabCols.getRawCols());
}

public String getTableName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public static String getDftTemplate() {


@Override
public IDataxContext getSubTask(Optional<IDataxProcessor.TableMap> tableMap) {
public IDataxContext getSubTask(Optional<IDataxProcessor.TableMap> tableMap, Optional<RecordTransformerRules> transformerRules) {
if (!tableMap.isPresent()) {
throw new IllegalArgumentException("param tableMap shall be present");
}
Expand All @@ -68,7 +68,7 @@ public IDataxContext getSubTask(Optional<IDataxProcessor.TableMap> tableMap) {
context.setUsername(dsFactory.userName);
context.setTabName(table.getTableName());
context.cols = IDataxProcessor.TabCols.create(new IDBReservedKeys() {
}, tm);
}, tm, transformerRules);
context.setDbName(this.dbName);
// context.writeMode = this.writeMode;
context.setPreSql(this.preSql);
Expand Down Expand Up @@ -151,7 +151,7 @@ public CreateTableSqlBuilder.CreateDDL generateCreateDDL(IDataxProcessor.TableMa

CreateTableSqlBuilder.CreateDDL createDDL = null;

final CreateTableSqlBuilder createTableSqlBuilder = new CreateTableSqlBuilder(tableMapper, this.getDataSourceFactory(),transformers) {
final CreateTableSqlBuilder createTableSqlBuilder = new CreateTableSqlBuilder(tableMapper, this.getDataSourceFactory(), transformers) {
@Override
protected void appendExtraColDef(List<String> pks) {
if (CollectionUtils.isEmpty(pks)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ public void testTempateGenerate() throws Exception {
private void validateConfigGenerate(String assertFileName, DataXDaMengWriter mySQLWriter) throws IOException {

Optional<IDataxProcessor.TableMap> tableMap = TestSelectedTabs.createTableMapper();
IDataxContext subTaskCtx = mySQLWriter.getSubTask(tableMap);
IDataxContext subTaskCtx = mySQLWriter.getSubTask(tableMap,Optional.empty());
Assert.assertNotNull(subTaskCtx);

RdbmsDataxContext mySQLDataxContext = (RdbmsDataxContext) subTaskCtx;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.qlangtech.tis.datax.IDataXGenerateCfgs;
import com.qlangtech.tis.datax.IDataxContext;
import com.qlangtech.tis.datax.IDataxProcessor;
import com.qlangtech.tis.datax.impl.DataXCfgGenerator;
import com.qlangtech.tis.datax.impl.DataxWriter;
import com.qlangtech.tis.exec.ExecutePhaseRange;
import com.qlangtech.tis.exec.IExecChainContext;
Expand All @@ -40,6 +39,7 @@
import com.qlangtech.tis.plugin.annotation.Validator;
import com.qlangtech.tis.plugin.datax.format.FileFormat;
import com.qlangtech.tis.plugin.datax.meta.MetaDataWriter;
import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules;
import com.qlangtech.tis.plugin.ds.ISelectedTab;
import com.qlangtech.tis.plugin.tdfs.ITDFSSession;
import com.qlangtech.tis.plugin.tdfs.TDFSLinker;
Expand Down Expand Up @@ -118,14 +118,14 @@ public String getTemplate() {
}

@Override
public IDataxContext getSubTask(Optional<IDataxProcessor.TableMap> tableMap) {
DataXDFSWriterContext writerContext = new DataXDFSWriterContext(this, tableMap.get());
public IDataxContext getSubTask(Optional<IDataxProcessor.TableMap> tableMap, Optional<RecordTransformerRules> transformerRules) {
DataXDFSWriterContext writerContext = new DataXDFSWriterContext(this, tableMap.get(), transformerRules);
return writerContext;
}


@TISExtension()
public static class DefaultDescriptor extends BaseDataxWriterDescriptor {
public static class DefaultDescriptor extends BaseDataxWriterDescriptor {
public DefaultDescriptor() {
super();
// registerSelectOptions(KEY_FTP_SERVER_LINK, () -> ParamsConfig.getItems(FTPServer.FTP_SERVER));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@

import com.qlangtech.tis.datax.IDataxContext;
import com.qlangtech.tis.datax.IDataxProcessor;
import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules;
import com.qlangtech.tis.plugin.ds.CMeta;
import com.qlangtech.tis.plugin.ds.IColMetaGetter;
import org.apache.commons.lang.StringUtils;

import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

/**
Expand All @@ -35,55 +38,20 @@ public class DataXDFSWriterContext implements IDataxContext {
private final DataXDFSWriter writer;
private final IDataxProcessor.TableMap tableMapper;
// private final FTPServer ftpServer;
private final List<IColMetaGetter> cols;

public DataXDFSWriterContext(DataXDFSWriter writer, IDataxProcessor.TableMap tableMapper) {
public DataXDFSWriterContext(DataXDFSWriter writer, IDataxProcessor.TableMap tableMapper, Optional<RecordTransformerRules> transformerRules) {
this.writer = writer;
Objects.requireNonNull(writer.fileFormat, "prop fileFormat can not be null");
Objects.requireNonNull(writer.dfsLinker, "prop linker can not be null");
this.tableMapper = tableMapper;

this.cols = transformerRules.map((rule) -> rule.overwriteCols(tableMapper.getSourceCols()))
.orElseGet(() -> tableMapper.getSourceCols().stream().collect(Collectors.toList()));

// this.ftpServer = FTPServer.getServer(writer.linker);
}

// public String getProtocol() {
// return this.ftpServer.protocol;
// }
//
// public String getHost() {
// return this.ftpServer.host;
// }
//
// public boolean isContainPort() {
// return this.ftpServer.port != null;
// }
//
// public Integer getPort() {
// return this.ftpServer.port;
// }
//
// public boolean isContainTimeout() {
// return this.ftpServer.timeout != null;
// }
//
// public Integer getTimeout() {
// return this.ftpServer.timeout;
// }
//
// public String getUsername() {
// return this.ftpServer.username;
// }
//
// public String getPassword() {
// return this.ftpServer.password;
// }
//
// public boolean isContainConnectPattern() {
// return StringUtils.isNotBlank(this.ftpServer.connectPattern);
// }
//
// public String getConnectPattern() {
// return this.ftpServer.connectPattern;
// }
//
public String getPath() {
String path = null;
if (StringUtils.isEmpty(path = this.writer.dfsLinker.getRootPath())) {
Expand Down Expand Up @@ -113,7 +81,7 @@ public boolean isContainEncoding() {
}

public String getEncoding() {
// return this.writer.encoding;
// return this.writer.encoding;
throw new UnsupportedOperationException();
}

Expand Down Expand Up @@ -162,12 +130,12 @@ public String getSuffix() {
}

public boolean isContainHeader() {
List<CMeta> cols = tableMapper.getSourceCols();
// List<CMeta> cols = tableMapper.getSourceCols();
return (this.writer.fileFormat.containHeader() && cols.size() > 0);
}

public String getHeader() {
return tableMapper.getSourceCols().stream().map((c) -> "'" + c.getName() + "'").collect(Collectors.joining(","));
return this.cols.stream().map((c) -> "'" + c.getName() + "'").collect(Collectors.joining(","));
}

}
Loading

0 comments on commit ac76375

Please sign in to comment.