Skip to content

Commit

Permalink
add DirtyRecordCreator to convert direct SQL insert statment
Browse files Browse the repository at this point in the history
  • Loading branch information
baisui1981 committed Oct 13, 2024
1 parent e7b453c commit 1714b0e
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ private String getSqlServerType(DataType type) {
case FLOAT:
case DOUBLE:
case DECIMAL:
return "decimal(8,4)";
return "decimal(" + type.getColumnSize() + ", " + type.getDecimalDigits() + ")";
case DATE:
case TIME:
case TIMESTAMP:
Expand Down
3 changes: 2 additions & 1 deletion tis-datax/tis-datax-sqlserver-v2019-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,12 @@
<version>${project.version}</version>
</dependency>

<!--https://github.com/microsoft/mssql-jdbc-->
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>
<!-- <version>9.2.1.jre8</version>-->
<version>12.4.0.jre8</version>
<version>12.8.1.jre11</version>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.core.statistics.plugin.task.util.DirtyRecord;
import com.google.common.collect.Lists;
import com.qlangtech.tis.hive.HdfsFileType;
import com.qlangtech.tis.hive.HdfsFormat;
Expand Down Expand Up @@ -153,7 +154,7 @@ public static MutablePair<Object[], Boolean> transportOneRecord(Record record, O
String message = String.format("字段类型转换错误:实际字段值为[%s].",
//colMeta.getType(),
column.toString());
taskPluginCollector.collectDirtyRecord(record, message);
taskPluginCollector.collectDirtyRecord(DirtyRecord.create(record), message);
transportResult.setRight(true);
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import com.dtstack.chunjun.connector.mysql.source.MysqlInputFormat;
import com.dtstack.chunjun.converter.IDeserializationConverter;
import com.dtstack.chunjun.element.column.BigDecimalColumn;
import com.qlangtech.tis.plugin.ds.DBConfig;

import com.qlangtech.tis.plugin.ds.DataSourceFactory;
import com.qlangtech.tis.plugin.ds.IColMetaGetter;
import com.qlangtech.tis.plugins.incr.flink.chunjun.common.DialectUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,8 @@ public BigIntGetter(String colName, int colIndex) {

@Override
public Object getObject(GenericRowData rowData) {
return rowData.getField(colIndex);
Object val = rowData.getField(colIndex);
return ((Number) val).longValue();
}
}

Expand All @@ -204,7 +205,8 @@ public FloatGetter(String colName, int colIndex) {

@Override
public Object getObject(GenericRowData rowData) {
return rowData.getFloat(colIndex);
Object val = rowData.getField(colIndex);
return ((Number) val).floatValue();//.getFloat(colIndex);
}
}

Expand All @@ -231,7 +233,11 @@ public SmallIntGetter(String colName, int colIndex) {

@Override
public Object getObject(GenericRowData rowData) {
return rowData.getShort(colIndex);
Object val = rowData.getField(colIndex);
if (val instanceof java.lang.Byte) {
return ((java.lang.Byte) val).shortValue();
}
return (Short) val; //rowData.getShort(colIndex);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,19 +105,20 @@ public Optional<String> getEscapeChar() {
= new CreateTableSqlBuilder<ColWrapper>(IDataxProcessor.TableMap.create(tableName, tabMeta.colMetas), sourceMeta, Optional.empty()) {
@Override
protected ColWrapper createColWrapper(IColMetaGetter c) {
return new ColWrapper(c) {
return new ColWrapper(c, this.pks) {
@Override
public String getMapperType() {
return convertType(meta);
return convertType(this);
}

@Override
protected void appendExtraConstraint(BlockScriptBuffer ddlScript) {
// super.appendExtraConstraint(ddlScript);
// appendExtraConstraint
Optional<String> f
= pks.stream().filter((pk) -> pk.equals(meta.getName())).findFirst();
if (f.isPresent()) {

// Optional<String> f
// = pks.stream().filter((pk) -> pk.equals(this.getName())).findFirst();
if (this.isPk()) {
ddlScript.append(" PRIMARY KEY NOT ENFORCED");
}
}
Expand Down Expand Up @@ -185,7 +186,7 @@ protected void appendTabMeta(List<String> pks) {
}
}

private String convertType(IColMetaGetter col) {
private String convertType(ColWrapper col) {
// https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/types/
return col.getType().accept(new DataType.TypeVisitor<String>() {
@Override
Expand Down

0 comments on commit 1714b0e

Please sign in to comment.