Skip to content

Commit

Permalink
refactor create DDL process
Browse files Browse the repository at this point in the history
  • Loading branch information
baisui1981 committed Oct 11, 2024
1 parent 308a59a commit e7b453c
Show file tree
Hide file tree
Showing 16 changed files with 240 additions and 192 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,15 @@
import com.qlangtech.tis.plugin.datax.common.BasicDataXRdbmsWriter;
import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules;
import com.qlangtech.tis.plugin.ds.CMeta;

import com.qlangtech.tis.plugin.ds.DataType;
import com.qlangtech.tis.plugin.ds.IColMetaGetter;
import com.qlangtech.tis.plugin.ds.clickhouse.ClickHouseDataSourceFactory;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;

import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -86,10 +87,10 @@ protected void appendExtraColDef(List<String> pks) {

@Override
protected ColWrapper createColWrapper(IColMetaGetter c) {
return new ColWrapper(c) {
return new ColWrapper(c, this.pks) {
@Override
public String getMapperType() {
return convertType(this.meta);
return convertType(this.getType());
}
};
}
Expand All @@ -103,9 +104,10 @@ protected void appendTabMeta(List<String> pk) {
script.append(" SETTINGS index_granularity = 8192");
}

private String convertType(IColMetaGetter col) {
DataType type = col.getType();
switch (type.getJdbcType()) {
private String convertType(DataType type) {

switch (Objects.requireNonNull(type, "type can not be null")
.getJdbcType()) {
case INTEGER:
case TINYINT:
case SMALLINT:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,10 @@ protected void appendTabMeta(List<String> pks) {

@Override
protected ColWrapper createColWrapper(IColMetaGetter c) {
return new ColWrapper(c) {
return new ColWrapper(c,this.pks) {
@Override
public String getMapperType() {
return convertType(this.meta);
return convertType(this);
}
};
}
Expand All @@ -183,7 +183,7 @@ public String getMapperType() {
* @param col
* @return
*/
private String convertType(IColMetaGetter col) {
private String convertType(ColWrapper col) {
DataType type = col.getType();
switch (type.getJdbcType()) {
case CHAR: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ protected static class DorisColWrapper extends ColWrapper {
protected DorisType dorisType;
private final BasicCreateTableSqlBuilder sqlBuilder;

public DorisColWrapper(IColMetaGetter meta, BasicCreateTableSqlBuilder sqlBuilder) {
super(meta);
public DorisColWrapper(IColMetaGetter meta, List<String> pks, BasicCreateTableSqlBuilder sqlBuilder) {
super(meta, pks);
this.sqlBuilder = sqlBuilder;
this.dorisType = convertType(meta);
}
Expand All @@ -120,7 +120,7 @@ public final String getMapperType() {

@Override
protected final void appendExtraConstraint(BlockScriptBuffer ddlScript) {
if (sqlBuilder.isPK(this.meta.getName())) {
if (sqlBuilder.isPK(this.getName())) {
ddlScript.append(" NOT NULL");
}
}
Expand Down Expand Up @@ -154,10 +154,10 @@ public BasicCreateTableSqlBuilder(IDataxProcessor.TableMap tableMapper
this.dorisTab = tableMapper.getSourceTab();
this.primaryKeys = Lists.newArrayList();
for (String pk : this.dorisTab.getPrimaryKeys()) {
for (IColMetaGetter c : this.getCols()) {
for (DorisColWrapper c : this.getCols()) {
if (pk.equalsIgnoreCase(c.getName())) {
// result.add(createColWrapper(c));
this.primaryKeys.add(createColWrapper(c));
this.primaryKeys.add((c));
}
}
}
Expand All @@ -178,7 +178,7 @@ public boolean isPK(String colName) {
protected abstract String getUniqueKeyToken();

@Override
protected List<DorisColWrapper> preProcessCols(List<String> pks, List<IColMetaGetter> cols) {
protected List<DorisColWrapper> preProcessCols(List<String> pks, List<DorisColWrapper> cols) {
//return super.preProcessCols(pks, cols);

// 将主键排在最前面
Expand All @@ -187,7 +187,7 @@ protected List<DorisColWrapper> preProcessCols(List<String> pks, List<IColMetaGe


cols.stream().filter((c) -> !this.pks.contains(c.getName())).forEach((c) -> {
result.add(createColWrapper(c));
result.add((c));
});
return result;
}
Expand All @@ -204,8 +204,8 @@ protected void appendTabMeta(List<String> pks) {
if (pks.size() > 0) {
script.append(primaryKeys.stream().map((pk) -> wrapWithEscape(pk.getName())).collect(Collectors.joining(",")));
} else {
List<IColMetaGetter> cols = this.getCols();
Optional<IColMetaGetter> firstCol = cols.stream().findFirst();
List<DorisColWrapper> cols = this.getCols();
Optional<DorisColWrapper> firstCol = cols.stream().findFirst();
if (firstCol.isPresent()) {
script.append(firstCol.get().getName());
} else {
Expand All @@ -226,7 +226,8 @@ protected void appendTabMeta(List<String> pks) {

@Override
protected DorisColWrapper createColWrapper(IColMetaGetter c) {
return new DorisColWrapper(c, this);

return new DorisColWrapper(c, this.pks, this);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ protected String getUniqueKeyToken() {

@Override
protected DorisColWrapper createColWrapper(IColMetaGetter col) {
return new DorisColWrapper(col, this) {
return new DorisColWrapper(col, this.pks, this) {
@Override
protected DorisType convertType(IColMetaGetter col) {
DorisType type = super.convertType(col);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import com.qlangtech.tis.plugin.datax.odps.OdpsDataSourceFactory;
import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules;
import com.qlangtech.tis.plugin.ds.CMeta;

import com.qlangtech.tis.plugin.ds.DataSourceMeta;
import com.qlangtech.tis.plugin.ds.DataType;
import com.qlangtech.tis.plugin.ds.IColMetaGetter;
Expand Down Expand Up @@ -234,7 +234,7 @@ protected void appendTabMeta(List<String> pks) {

@Override
protected ColWrapper createColWrapper(IColMetaGetter c) {
return new ColWrapper(c) {
return new ColWrapper(c, this.pks) {
@Override
public String getMapperType() {
return c.getType().accept(typeTransfer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,10 @@ protected void appendTabMeta(List<String> pks) {

@Override
protected ColWrapper createColWrapper(IColMetaGetter c) {
return new ColWrapper(c) {
return new ColWrapper(c,this.pks) {
@Override
public String getMapperType() {
return convertType(this.meta);
return convertType(this);
}
};
}
Expand All @@ -109,7 +109,7 @@ public String getMapperType() {
* @param col
* @return
*/
private String convertType(IColMetaGetter col) {
private String convertType(ColWrapper col) {
DataType type = col.getType();
switch (type.getJdbcType()) {
case CHAR: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,12 @@
import com.qlangtech.tis.datax.IDataxProcessor;
import com.qlangtech.tis.extension.TISExtension;
import com.qlangtech.tis.extension.impl.IOUtils;
import com.qlangtech.tis.plugin.datax.CreateTableSqlBuilder.ColWrapper;
import com.qlangtech.tis.plugin.datax.common.BasicDataXRdbmsWriter;
import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules;
import com.qlangtech.tis.plugin.ds.CMeta;
import com.qlangtech.tis.plugin.ds.DataType;
import com.qlangtech.tis.plugin.ds.IColMetaGetter;
import com.qlangtech.tis.plugin.ds.postgresql.PGDataSourceFactory;
import org.apache.commons.lang.StringUtils;

import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

/**
* @author: baisui 百岁
Expand All @@ -55,128 +48,9 @@ public static void main(String[] args) {

@Override
public CreateTableSqlBuilder.CreateDDL generateCreateDDL(IDataxProcessor.TableMap tableMapper, Optional<RecordTransformerRules> transformers) {
// if (!this.autoCreateTable) {
// return null;
// }

PGDataSourceFactory ds = this.getDataSourceFactory();
// 多个主键
boolean multiPk = Objects.requireNonNull(tableMapper.getSourceCols(), "sourceCols can not be null")
.stream().filter((col) -> col.isPk()).count() > 1;

final CreateTableSqlBuilder createTableSqlBuilder = new CreateTableSqlBuilder<ColWrapper>(tableMapper, ds, transformers) {
@Override
protected CreateTableName getCreateTableName() {
return new CreateTableName(ds.tabSchema, tableMapper.getTo(), this);
}

@Override
protected void appendExtraColDef(List<String> pks) {
// if (!pks.isEmpty()) {
// script.append(" PRIMARY KEY (").append(pks.stream().map((pk) -> "`" + pk.getName() + "`")
// .collect(Collectors.joining(","))).append(")").append("\n");
// }
if (multiPk) {
this.script.append(", CONSTRAINT ").append("uk_" + tableMapper.getTo() + "_unique_" + pks.stream().map((c) -> c).collect(Collectors.joining("_")))
.append(" UNIQUE(")
.append(pks.stream().map((c) -> c).collect(Collectors.joining(","))).append(")");
}
}


@Override
protected ColWrapper createColWrapper(IColMetaGetter c) {
return new ColWrapper(c) {
@Override
public String getMapperType() {
return convertType(this.meta);
}
};
}

@Override
protected void appendTabMeta(List<String> pks) {

}

/**
* https://www.runoob.com/mysql/mysql-data-types.html
* @param col
* @return
*/
private String convertType(IColMetaGetter col) {
DataType type = col.getType();
String colType = type.accept(new DataType.TypeVisitor<String>() {
@Override
public String bigInt(DataType type) {
return "BIGINT";
}

@Override
public String doubleType(DataType type) {
return "FLOAT8";
}

@Override
public String dateType(DataType type) {
return "DATE";
}

@Override
public String timestampType(DataType type) {
return "TIMESTAMP";
}

@Override
public String bitType(DataType type) {
return "BIT";
}

@Override
public String blobType(DataType type) {
return "BYTEA";
}

@Override
public String varcharType(DataType type) {
return "VARCHAR(" + type.getColumnSize() + ")";
}

@Override
public String intType(DataType type) {
return "INTEGER";
}

@Override
public String floatType(DataType type) {
return "FLOAT4";
}

@Override
public String decimalType(DataType type) {
return "DECIMAL";
}

@Override
public String timeType(DataType type) {
return "TIME";
}

@Override
public String tinyIntType(DataType dataType) {
return smallIntType(dataType);
}

@Override
public String smallIntType(DataType dataType) {
return "SMALLINT";
}
});

return colType + (!multiPk && col.isPk() ? " PRIMARY KEY" : StringUtils.EMPTY);
}

};

final CreateTableSqlBuilder createTableSqlBuilder = new PostgreSQLCreateTableSqlBuilder(tableMapper, ds, transformers);
return createTableSqlBuilder.build();
}

Expand Down
Loading

0 comments on commit e7b453c

Please sign in to comment.