Skip to content

Commit

Permalink
add DirtyRecordCreator to convert direct SQL insert statment
Browse files Browse the repository at this point in the history
  • Loading branch information
baisui1981 committed Oct 13, 2024
1 parent 57a0dba commit 4ac2ebc
Show file tree
Hide file tree
Showing 22 changed files with 979 additions and 426 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.core.statistics.plugin.task.util.DirtyRecord;
import com.alibaba.fastjson.JSON;

import com.datastax.driver.core.Cluster;
Expand Down Expand Up @@ -403,7 +404,7 @@ static Record buildRecord(Record record, Row rs, ColumnDefinitions metaData, int
}
} catch (Exception e) {
//TODO 这里识别为脏数据靠谱吗?
taskPluginCollector.collectDirtyRecord(record, e);
taskPluginCollector.collectDirtyRecord(DirtyRecord.create(record), e);
if (e instanceof DataXException) {
throw (DataXException) e;
}
Expand Down
16 changes: 8 additions & 8 deletions common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,14 @@

<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${jdk-version}</source>
<target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding>
</configuration>
</plugin>
<!-- <plugin>-->
<!-- <artifactId>maven-compiler-plugin</artifactId>-->
<!-- <configuration>-->
<!-- <source>${jdk-version}</source>-->
<!-- <target>${jdk-version}</target>-->
<!-- <encoding>${project-sourceEncoding}</encoding>-->
<!-- </configuration>-->
<!-- </plugin>-->
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -1,57 +1,49 @@
package com.alibaba.datax.common.plugin;

import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.element.DirtyRecordCreator;

/**
*
* 该接口提供给Task Plugin用来记录脏数据和自定义信息。 <br >
*
* <p>
* 1. 脏数据记录,TaskPluginCollector提供多种脏数据记录的适配,包括本地输出、集中式汇报等等<br >
* 2. 自定义信息,所有的task插件运行过程中可以通过TaskPluginCollector收集信息, <br >
* Job的插件在POST过程中通过getMessage()接口获取信息
*/
public abstract class TaskPluginCollector implements PluginCollector {
/**
* 收集脏数据
*
* @param dirtyRecord
* 脏数据信息
* @param t
* 异常信息
* @param errorMessage
* 错误的提示信息
*/
public abstract void collectDirtyRecord(final Record dirtyRecord,
final Throwable t, final String errorMessage);
/**
* 收集脏数据
*
* @param dirtyRecord 脏数据信息
* @param t 异常信息
* @param errorMessage 错误的提示信息
*/
public abstract void collectDirtyRecord( final DirtyRecordCreator dirtyRecord,
final Throwable t, final String errorMessage);

/**
* 收集脏数据
*
* @param dirtyRecord
* 脏数据信息
* @param errorMessage
* 错误的提示信息
*/
public void collectDirtyRecord(final Record dirtyRecord,
final String errorMessage) {
this.collectDirtyRecord(dirtyRecord, null, errorMessage);
}
/**
* 收集脏数据
*
* @param dirtyRecord 脏数据信息
* @param errorMessage 错误的提示信息
*/
public void collectDirtyRecord(final DirtyRecordCreator dirtyRecord,
final String errorMessage) {
this.collectDirtyRecord(dirtyRecord, null, errorMessage);
}

/**
* 收集脏数据
*
* @param dirtyRecord
* 脏数据信息
* @param t
* 异常信息
*/
public void collectDirtyRecord(final Record dirtyRecord, final Throwable t) {
this.collectDirtyRecord(dirtyRecord, t, "");
}
/**
* 收集脏数据
*
* @param dirtyRecord 脏数据信息
* @param t 异常信息
*/
public void collectDirtyRecord(final DirtyRecordCreator dirtyRecord, final Throwable t) {
this.collectDirtyRecord(dirtyRecord, t, "");
}

/**
* 收集自定义信息,Job插件可以通过getMessage获取该信息 <br >
* 如果多个key冲突,内部使用List记录同一个key,多个value情况。<br >
* */
public abstract void collectMessage(final String key, final String value);
/**
* 收集自定义信息,Job插件可以通过getMessage获取该信息 <br >
* 如果多个key冲突,内部使用List记录同一个key,多个value情况。<br >
*/
public abstract void collectMessage(final String key, final String value);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
package com.alibaba.datax.core.statistics.plugin.task.util;

import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.DirtyRecordCreator;
import com.alibaba.datax.common.element.ICol2Index;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.spi.ErrorCode;
import com.alibaba.fastjson.JSON;

import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

/**
* @author: 百岁([email protected]
* @create: 2024-10-13 09:49
**/
public class DirtyRecord implements Record {
private static final ErrorCode RUNTIME_ERROR = new ErrorCode() {
@Override
public String getCode() {
return "dirtyRecordError";
}

@Override
public String getDescription() {
return "dirtyRecordError";
}
};

private List<Column> columns = new ArrayList<Column>();

public static DirtyRecordCreator create(Record record) {
if (record == null) {
throw new IllegalArgumentException("param record can not be null");
}
return new DirtyRecordCreator() {
@Override
public Object createDirtyRecordDescriptor() {
return DirtyRecord.asDirtyRecord(record).getColumns();
}

@Override
public int getByteSize() {
return record.getByteSize();
}

@Override
public int getMemorySize() {
return record.getMemorySize();
}
};
}


public static DirtyRecord asDirtyRecord(final Record record) {
DirtyRecord result = new DirtyRecord();
for (int i = 0; i < record.getColumnNumber(); i++) {
result.addColumn(record.getColumn(i));
}

return result;
}

@Override
public String getString(String field, boolean origin) {
throw new UnsupportedOperationException();
}

@Override
public void setCol2Index(ICol2Index mapper) {
throw DataXException.asDataXException(RUNTIME_ERROR,
"该方法不支持!");
}

@Override
public ICol2Index getCol2Index() {
throw DataXException.asDataXException(RUNTIME_ERROR,
"该方法不支持!");
}

@Override
public void addColumn(Column column) {
this.columns.add(
DirtyColumn.asDirtyColumn(column, this.columns.size()));
}

@Override
public String toString() {
return JSON.toJSONString(this.columns);
}

@Override
public void setColumn(int i, Column column) {
throw DataXException.asDataXException(RUNTIME_ERROR,
"该方法不支持!");
}

@Override
public void setString(String field, String val) {
throw DataXException.asDataXException(RUNTIME_ERROR,
"该方法不支持!");
}

@Override
public void setColumn(String field, Object column) {
throw DataXException.asDataXException(RUNTIME_ERROR,
"该方法不支持!");
}

@Override
public Column getColumn(String field) {
throw DataXException.asDataXException(RUNTIME_ERROR,
"该方法不支持!");
}

@Override
public Column getColumn(int i) {
throw DataXException.asDataXException(RUNTIME_ERROR,
"该方法不支持!");
}

@Override
public int getColumnNumber() {
throw DataXException.asDataXException(RUNTIME_ERROR,
"该方法不支持!");
}

@Override
public int getByteSize() {
throw DataXException.asDataXException(RUNTIME_ERROR,
"该方法不支持!");
}

@Override
public int getMemorySize() {
throw DataXException.asDataXException(RUNTIME_ERROR,
"该方法不支持!");
}

public List<Column> getColumns() {
return columns;
}

public void setColumns(List<Column> columns) {
this.columns = columns;
}


static class DirtyColumn extends Column {
private int index;

public static Column asDirtyColumn(final Column column, int index) {
return new DirtyColumn(column, index);
}

private DirtyColumn(Column column, int index) {
this(null == column ? null : column.getRawData(),
null == column ? Type.NULL : column.getType(),
null == column ? 0 : column.getByteSize(), index);
}

public int getIndex() {
return index;
}

public void setIndex(int index) {
this.index = index;
}

@Override
public Long asLong() {
throw DataXException.asDataXException(RUNTIME_ERROR,
"该方法不支持!");
}

@Override
public Double asDouble() {
throw DataXException.asDataXException(RUNTIME_ERROR,
"该方法不支持!");
}

@Override
public String asString() {
throw DataXException.asDataXException(RUNTIME_ERROR,
"该方法不支持!");
}

@Override
public Date asDate() {
throw DataXException.asDataXException(RUNTIME_ERROR,
"该方法不支持!");
}

@Override
public Date asDate(String dateFormat) {
throw DataXException.asDataXException(RUNTIME_ERROR,
"该方法不支持!");
}

@Override
public byte[] asBytes() {
throw DataXException.asDataXException(RUNTIME_ERROR,
"该方法不支持!");
}

@Override
public Boolean asBoolean() {
throw DataXException.asDataXException(RUNTIME_ERROR,
"该方法不支持!");
}

@Override
public BigDecimal asBigDecimal() {
throw DataXException.asDataXException(RUNTIME_ERROR,
"该方法不支持!");
}

@Override
public BigInteger asBigInteger() {
throw DataXException.asDataXException(RUNTIME_ERROR,
"该方法不支持!");
}

private DirtyColumn(Object object, Type type, int byteSize, int index) {
super(object, type, byteSize);
this.setIndex(index);
}


}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.alibaba.datax.core.statistics.plugin.task;

import com.alibaba.datax.common.element.DirtyRecordCreator;
import com.alibaba.datax.common.element.RecordMetrix;
import com.alibaba.datax.core.statistics.communication.Communication;
import com.alibaba.datax.core.statistics.communication.CommunicationTool;
import com.alibaba.datax.common.constant.PluginType;
Expand Down Expand Up @@ -50,7 +52,7 @@ final public void collectMessage(String key, String value) {
}

@Override
public void collectDirtyRecord(Record dirtyRecord, Throwable t,
public void collectDirtyRecord(DirtyRecordCreator dirtyRecord, Throwable t,
String errorMessage) {

if (null == dirtyRecord) {
Expand Down
Loading

0 comments on commit 4ac2ebc

Please sign in to comment.