Skip to content

Commit

Permalink
add transformer for Row based for auto DDL create logic
Browse files Browse the repository at this point in the history
  • Loading branch information
baisui1981 committed Jun 22, 2024
1 parent 832622b commit 6823bfc
Show file tree
Hide file tree
Showing 41 changed files with 286 additions and 228 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
import com.qlangtech.tis.plugin.annotation.FormFieldType;
import com.qlangtech.tis.plugin.annotation.Validator;
import com.qlangtech.tis.plugin.datax.common.BasicDataXRdbmsWriter;
import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules;
import com.qlangtech.tis.plugin.ds.CMeta;
import com.qlangtech.tis.plugin.ds.DataType;
import com.qlangtech.tis.plugin.ds.IColMetaGetter;
import com.qlangtech.tis.plugin.ds.clickhouse.ClickHouseDataSourceFactory;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
Expand Down Expand Up @@ -70,18 +72,18 @@ public class DataXClickhouseWriter extends BasicDataXRdbmsWriter<ClickHouseDataS
// }

@Override
public CreateTableSqlBuilder.CreateDDL generateCreateDDL(IDataxProcessor.TableMap tableMapper) {
public CreateTableSqlBuilder.CreateDDL generateCreateDDL(IDataxProcessor.TableMap tableMapper, Optional<RecordTransformerRules> transformers) {
// if (!this.autoCreateTable) {
// return null;
// }
final CreateTableSqlBuilder createTableSqlBuilder = new CreateTableSqlBuilder(tableMapper, this.getDataSourceFactory()) {
final CreateTableSqlBuilder createTableSqlBuilder = new CreateTableSqlBuilder(tableMapper, this.getDataSourceFactory(), transformers) {
@Override
protected void appendExtraColDef(List<String> pks) {
script.append(" ," + wrapWithEscape(ClickHouseCommon.KEY_CLICKHOUSE_CK) + " Int8 DEFAULT 1").append("\n");
}

@Override
protected ColWrapper createColWrapper(CMeta c) {
protected ColWrapper createColWrapper(IColMetaGetter c) {
return new ColWrapper(c) {
@Override
public String getMapperType() {
Expand All @@ -99,7 +101,7 @@ protected void appendTabMeta(List<String> pk) {
script.append(" SETTINGS index_granularity = 8192");
}

private String convertType(CMeta col) {
private String convertType(IColMetaGetter col) {
DataType type = col.getType();
switch (type.getJdbcType()) {
case INTEGER:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public void testRealDump() throws Exception {
};
EasyMock.replay(dataXProcessor);
DataXClickhouseWriter writer = new DataXClickhouseWriter();
WriterTemplate.realExecuteDump(DataXCfgJson.path(TestDataXClickhouseWriter.class,clickhouse_datax_writer_assert_without_optional), writer);
WriterTemplate.realExecuteDump(DataXCfgJson.path(TestDataXClickhouseWriter.class, clickhouse_datax_writer_assert_without_optional), writer);

EasyMock.verify(dataXProcessor);
} finally {
Expand All @@ -205,11 +205,11 @@ public void testGenerateCreateDDL() {

ClickHouseTest dataXWriter = createDataXWriter();

CreateTableSqlBuilder.CreateDDL createDDL = dataXWriter.writer.generateCreateDDL(dataXWriter.tableMap);
CreateTableSqlBuilder.CreateDDL createDDL = dataXWriter.writer.generateCreateDDL(dataXWriter.tableMap, Optional.empty());
assertNull(createDDL);

dataXWriter.writer.autoCreateTable = true;
createDDL = dataXWriter.writer.generateCreateDDL(dataXWriter.tableMap);
createDDL = dataXWriter.writer.generateCreateDDL(dataXWriter.tableMap, Optional.empty());
assertNotNull(createDDL);

assertEquals("CREATE TABLE customer_order_relation\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.collect.Lists;
import com.qlangtech.tis.TIS;
import com.qlangtech.tis.datax.DataXCfgFile;
import com.qlangtech.tis.datax.IDataXNameAware;
import com.qlangtech.tis.datax.IDataxProcessor;
import com.qlangtech.tis.datax.IDataxWriter;
import com.qlangtech.tis.datax.impl.DataxProcessor;
Expand Down Expand Up @@ -63,7 +64,7 @@
* @create: 2021-06-23 12:07
**/
public abstract class BasicDataXRdbmsWriter<DS extends DataSourceFactory> extends DataxWriter
implements IDataSourceFactoryGetter, IInitWriterTableExecutor, KeyedPluginStore.IPluginKeyAware {
implements IDataSourceFactoryGetter, IInitWriterTableExecutor, KeyedPluginStore.IPluginKeyAware, IDataXNameAware {
public static final String KEY_DB_NAME_FIELD_NAME = "dbName";
private static final Logger logger = LoggerFactory.getLogger(BasicDataXRdbmsWriter.class);

Expand Down Expand Up @@ -101,6 +102,11 @@ public boolean isGenerateCreateDDLSwitchOff() {

public transient String dataXName;

@Override
public final String getCollectionName() {
return this.dataXName;
}

@Override
public Integer getRowFetchSize() {
throw new UnsupportedOperationException("just support in DataX Reader");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
import com.qlangtech.tis.plugin.datax.common.BasicDataXRdbmsWriter;
import com.qlangtech.tis.plugin.datax.dameng.ds.DaMengDataSourceFactory;
import com.qlangtech.tis.plugin.datax.dameng.reader.DataXDaMengReader;
import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules;
import com.qlangtech.tis.plugin.ds.CMeta;
import com.qlangtech.tis.plugin.ds.DataDumpers;
import com.qlangtech.tis.plugin.ds.DataType;
import com.qlangtech.tis.plugin.ds.IColMetaGetter;
import com.qlangtech.tis.plugin.ds.IDBReservedKeys;
import com.qlangtech.tis.plugin.ds.IDataSourceDumper;
import com.qlangtech.tis.plugin.ds.TISTable;
Expand Down Expand Up @@ -88,7 +90,7 @@ public IDataxContext getSubTask(Optional<IDataxProcessor.TableMap> tableMap) {
* @return
*/
@Override
public CreateTableSqlBuilder.CreateDDL generateCreateDDL(IDataxProcessor.TableMap tableMapper) {
public CreateTableSqlBuilder.CreateDDL generateCreateDDL(IDataxProcessor.TableMap tableMapper, Optional<RecordTransformerRules> transformers) {
// if (!this.autoCreateTable) {
// return null;
// }
Expand Down Expand Up @@ -149,7 +151,7 @@ public CreateTableSqlBuilder.CreateDDL generateCreateDDL(IDataxProcessor.TableMa

CreateTableSqlBuilder.CreateDDL createDDL = null;

final CreateTableSqlBuilder createTableSqlBuilder = new CreateTableSqlBuilder(tableMapper, this.getDataSourceFactory()) {
final CreateTableSqlBuilder createTableSqlBuilder = new CreateTableSqlBuilder(tableMapper, this.getDataSourceFactory(),transformers) {
@Override
protected void appendExtraColDef(List<String> pks) {
if (CollectionUtils.isEmpty(pks)) {
Expand All @@ -165,7 +167,7 @@ protected void appendTabMeta(List<String> pks) {
}

@Override
protected ColWrapper createColWrapper(CMeta c) {
protected ColWrapper createColWrapper(IColMetaGetter c) {
return new ColWrapper(c) {
@Override
public String getMapperType() {
Expand All @@ -180,7 +182,7 @@ public String getMapperType() {
* @param col
* @return
*/
private String convertType(CMeta col) {
private String convertType(IColMetaGetter col) {
DataType type = col.getType();
switch (type.getJdbcType()) {
case CHAR: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@
import com.qlangtech.tis.plugin.annotation.Validator;
import com.qlangtech.tis.plugin.datax.CreateTableSqlBuilder;
import com.qlangtech.tis.plugin.datax.common.BasicDataXRdbmsWriter;
import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules;
import com.qlangtech.tis.plugin.ds.CMeta;
import com.qlangtech.tis.plugin.ds.DataSourceMeta;
import com.qlangtech.tis.plugin.ds.DataType;
import com.qlangtech.tis.plugin.ds.DataTypeMeta;
import com.qlangtech.tis.plugin.ds.IColMetaGetter;
import com.qlangtech.tis.plugin.ds.ISelectedTab;
import com.qlangtech.tis.plugin.ds.doris.DorisSourceFactory;
import com.qlangtech.tis.runtime.module.misc.IFieldErrorHandler;
Expand Down Expand Up @@ -82,28 +84,31 @@ public interface Separator {


@Override
public final CreateTableSqlBuilder.CreateDDL generateCreateDDL(IDataxProcessor.TableMap tableMapper) {
public final CreateTableSqlBuilder.CreateDDL generateCreateDDL(IDataxProcessor.TableMap tableMapper, Optional<RecordTransformerRules> transformers) {
// if (!this.autoCreateTable) {
// return null;
// }
// https://doris.apache.org/docs/1.2/sql-manual/sql-reference/Data-Types/DATETIMEV2/
// https://doris.apache.org/docs/dev/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE
// https://docs.starrocks.io/zh-cn/2.4/sql-reference/sql-statements/data-definition/CREATE%20TABLE
final BasicCreateTableSqlBuilder createTableSqlBuilder = createSQLDDLBuilder(tableMapper);
final BasicCreateTableSqlBuilder createTableSqlBuilder = createSQLDDLBuilder(tableMapper, transformers);

return createTableSqlBuilder.build();
}

protected abstract BasicCreateTableSqlBuilder createSQLDDLBuilder(IDataxProcessor.TableMap tableMapper);
protected abstract BasicCreateTableSqlBuilder createSQLDDLBuilder(IDataxProcessor.TableMap tableMapper, Optional<RecordTransformerRules> transformers);


protected static abstract class BasicCreateTableSqlBuilder extends CreateTableSqlBuilder {
private final ISelectedTab dorisTab;
private final List<String> primaryKeys;
private final DataType.TypeVisitor<DorisType> columnTokenRecognise;

public BasicCreateTableSqlBuilder(IDataxProcessor.TableMap tableMapper, DataSourceMeta dsMeta, DataType.TypeVisitor<DorisType> columnTokenRecognise) {
super(tableMapper, dsMeta);
public BasicCreateTableSqlBuilder(IDataxProcessor.TableMap tableMapper
, DataSourceMeta dsMeta
, DataType.TypeVisitor<DorisType> columnTokenRecognise
, Optional<RecordTransformerRules> transformers) {
super(tableMapper, dsMeta, transformers);
// (DorisSelectedTab)
this.dorisTab = tableMapper.getSourceTab();
this.primaryKeys = this.dorisTab.getPrimaryKeys();
Expand All @@ -119,11 +124,11 @@ protected void appendExtraColDef(List<String> pks) {


@Override
protected List<ColWrapper> preProcessCols(List<String> pks, List<CMeta> cols) {
protected List<ColWrapper> preProcessCols(List<String> pks, List<IColMetaGetter> cols) {
// 将主键排在最前面
List<ColWrapper> result = Lists.newArrayList();
for (String pk : primaryKeys) {
for (CMeta c : cols) {
for (IColMetaGetter c : cols) {
if (pk.equalsIgnoreCase(c.getName())) {
result.add(createColWrapper(c));
}
Expand All @@ -146,8 +151,8 @@ protected void appendTabMeta(List<String> pks) {
if (pks.size() > 0) {
script.append(pks.stream().map((pk) -> wrapWithEscape(pk)).collect(Collectors.joining(",")));
} else {
List<CMeta> cols = this.getCols();
Optional<CMeta> firstCol = cols.stream().findFirst();
List<IColMetaGetter> cols = this.getCols();
Optional<IColMetaGetter> firstCol = cols.stream().findFirst();
if (firstCol.isPresent()) {
script.append(firstCol.get().getName());
} else {
Expand All @@ -158,35 +163,16 @@ protected void appendTabMeta(List<String> pks) {
script.append("BUCKETS 10\n");
StringBuffer seqBuffer = new StringBuffer();
if (dorisTab instanceof DorisSelectedTab) {
seqBuffer = ((DorisSelectedTab) dorisTab).seqKey.createDDLScript(this.tableMapper);
seqBuffer = ((DorisSelectedTab) dorisTab).seqKey.createDDLScript(this);
}
// StringBuffer seqBuffer = dorisTab// new StringBuffer();
// if (StringUtils.isNotEmpty(dorisTab.seqKey)) {
//
// List<CMeta> cols = this.tableMapper.getSourceCols();
// Optional<CMeta> p = cols.stream().filter((c) -> dorisTab.seqKey.equals(c.getName()))
// .findFirst();
// if (!p.isPresent()) {
// throw new IllegalStateException("can not find col:" + dorisTab.seqKey);
// }
//
//// seqBuffer.append("\n\t, \"function_column.sequence_col\" = '").append(dorisTab.seqKey)
//// .append("'\n\t, \"function_column.sequence_type\"='").append(createColWrapper
// (p.get()).getMapperType()).append("'");
//
// seqBuffer.append("\n\t, \"function_column.sequence_col\" = '").append(dorisTab.seqKey);
// // .append("'\n\t, \"function_column.sequence_type\"='").append(createColWrapper
// (p.get()).getMapperType()).append("'");
//
// }

script.append("PROPERTIES(\"replication_num\" = \"1\" " + seqBuffer + " )");


}

@Override
protected ColWrapper createColWrapper(CMeta c) {
protected ColWrapper createColWrapper(IColMetaGetter c) {
return new ColWrapper(c) {
@Override
public String getMapperType() {
Expand All @@ -206,7 +192,7 @@ protected void appendExtraConstraint(BlockScriptBuffer ddlScript) {
};
}

protected DorisType convertType(CMeta col) {
protected DorisType convertType(IColMetaGetter col) {
DataType type = col.getType();
return type.accept(columnTokenRecognise);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.qlangtech.tis.annotation.Public;
import com.qlangtech.tis.datax.IDataxContext;
import com.qlangtech.tis.datax.IDataxProcessor;
import com.qlangtech.tis.datax.IDataxProcessor.TableMap;
import com.qlangtech.tis.datax.impl.DataxWriter;
import com.qlangtech.tis.extension.Descriptor;
import com.qlangtech.tis.extension.ElementPluginDesc;
Expand All @@ -36,8 +37,10 @@
import com.qlangtech.tis.plugin.annotation.FormFieldType;
import com.qlangtech.tis.plugin.annotation.Validator;
import com.qlangtech.tis.plugin.datax.SelectedTab;
import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules;
import com.qlangtech.tis.plugin.ds.CMeta;
import com.qlangtech.tis.plugin.ds.DataType;
import com.qlangtech.tis.plugin.ds.IColMetaGetter;
import com.qlangtech.tis.plugin.ds.doris.DorisSourceFactory;
import org.apache.commons.lang3.StringUtils;

Expand Down Expand Up @@ -102,16 +105,20 @@ protected String getDecimalToken() {
}
};



@Override
protected BasicCreateTableSqlBuilder createSQLDDLBuilder(IDataxProcessor.TableMap tableMapper) {
return new BasicCreateTableSqlBuilder(tableMapper, this.getDataSourceFactory(), columnTokenRecognise) {
protected BasicCreateTableSqlBuilder createSQLDDLBuilder(IDataxProcessor.TableMap tableMapper, Optional<RecordTransformerRules> transformers) {
return new BasicCreateTableSqlBuilder(tableMapper, this.getDataSourceFactory(), columnTokenRecognise, transformers) {


@Override
protected String getUniqueKeyToken() {
return createTableModel.getKeyToken();
}

@Override
protected DorisType convertType(CMeta col) {
protected DorisType convertType(IColMetaGetter col) {
DorisType type = super.convertType(col);
DorisType fixType = col.getType().accept(new DataType.TypeVisitor<DorisType>() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
import com.qlangtech.tis.plugin.annotation.FormField;
import com.qlangtech.tis.plugin.annotation.FormFieldType;
import com.qlangtech.tis.plugin.annotation.Validator;
import com.qlangtech.tis.plugin.datax.AbstractCreateTableSqlBuilder;
import com.qlangtech.tis.plugin.datax.SelectedTab;
import com.qlangtech.tis.plugin.ds.CMeta;
import com.qlangtech.tis.plugin.ds.DataXReaderColType;
import com.qlangtech.tis.plugin.ds.IColMetaGetter;
import org.apache.commons.lang3.StringUtils;

import java.util.List;
Expand Down Expand Up @@ -61,7 +63,7 @@ public String getSeqColName() {
}

@Override
public StringBuffer createDDLScript(IDataxProcessor.TableMap tableMapper) {
public StringBuffer createDDLScript(AbstractCreateTableSqlBuilder tableMapper) {
// if (StringUtils.isNotEmpty(this.seqKey)) {
if (tableMapper == null) {
throw new IllegalArgumentException("param tableMapper can not be null");
Expand All @@ -70,8 +72,8 @@ public StringBuffer createDDLScript(IDataxProcessor.TableMap tableMapper) {
throw new IllegalArgumentException("param seqKey can not be null");
}
StringBuffer seqBuffer = new StringBuffer();
List<CMeta> cols = tableMapper.getSourceCols();
Optional<CMeta> p = cols.stream().filter((c) -> seqKey.equals(c.getName())).findFirst();
List<IColMetaGetter> cols = tableMapper.getCols();
Optional<IColMetaGetter> p = cols.stream().filter((c) -> seqKey.equals(c.getName())).findFirst();
if (!p.isPresent()) {
throw new IllegalStateException("can not find col:" + seqKey);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
package com.qlangtech.tis.plugin.datax.seq;

import com.alibaba.fastjson.JSONObject;
import com.qlangtech.tis.datax.IDataxProcessor;
import com.qlangtech.tis.extension.Describable;
import com.qlangtech.tis.plugin.datax.AbstractCreateTableSqlBuilder;

/**
* https://doris.apache.org/docs/dev/data-operate/update-delete/sequence-column-manual?_highlight=seq
Expand All @@ -30,7 +30,7 @@
**/
public abstract class SeqKey implements Describable<SeqKey> {

public StringBuffer createDDLScript(IDataxProcessor.TableMap tableMapper) {
public StringBuffer createDDLScript(AbstractCreateTableSqlBuilder tableMapper) {
return new StringBuffer();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import com.qlangtech.tis.datax.DataXCfgFile;
import com.qlangtech.tis.datax.IDataxProcessor;
import com.qlangtech.tis.datax.impl.DataxProcessor;
import com.qlangtech.tis.datax.impl.DataxReader;
Expand Down Expand Up @@ -100,7 +101,7 @@ public void testGenerateCreateDDL() {
col.setName("col6");
col.setType(DataXReaderColType.STRING.dataType);
cols.add(col);
}));
}), Optional.empty());

assertNotNull(ddl);
// System.out.println(ddl);
Expand Down
Loading

0 comments on commit 6823bfc

Please sign in to comment.