Skip to content

Commit

Permalink
add transformer for TIS, owerwrite cols for RowDataTransformerMapper
Browse files Browse the repository at this point in the history
  • Loading branch information
baisui1981 committed Jul 8, 2024
1 parent fb0b6b9 commit f4017c2
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import com.qlangtech.plugins.incr.flink.cdc.FlinkCol;
import com.qlangtech.tis.TIS;
import com.qlangtech.tis.async.message.client.consumer.IFlinkColCreator;
import com.qlangtech.tis.async.message.client.consumer.impl.MQListenerFactory;
import com.qlangtech.tis.datax.IDataXNameAware;
import com.qlangtech.tis.datax.IDataxProcessor;
import com.qlangtech.tis.datax.IDataxReader;
Expand Down Expand Up @@ -83,6 +84,7 @@
import com.qlangtech.tis.runtime.module.misc.IFieldErrorHandler;
import com.qlangtech.tis.sql.parser.tuple.creator.EntityName;
import com.qlangtech.tis.sql.parser.tuple.creator.IStreamIncrGenerateStrategy;
import com.qlangtech.tis.util.HeteroEnum;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.flink.api.common.io.OutputFormat;
Expand Down Expand Up @@ -205,22 +207,27 @@ public RowDataSinkFunc createRowDataSinkFunc(IDataxProcessor dataxProcessor
+ " can not find matched table in:["
+ tabs.stream().map((t) -> t.getName()).collect(Collectors.joining(",")) + "]");
}

final SelectedTab tab = (SelectedTab) selectedTab.get();
final CreateChunjunSinkFunctionResult sinkFunc
= createSinFunctionResult(dataxProcessor
, (SelectedTab) selectedTab.get(), tabName.getTo(), shallInitSinkTable);
, tab, tabName.getTo(), shallInitSinkTable);

if (this.parallelism == null) {
throw new IllegalStateException("param parallelism can not be null");
}

//String dataXName, TableAlias tabAlias, ISelectedTab tab, IFlinkColCreator<FlinkCol> sourceFlinkColCreator
MQListenerFactory sourceListenerFactory = HeteroEnum.getIncrSourceListenerFactory(dataxProcessor.identityValue());
IFlinkColCreator<FlinkCol> sourceFlinkColCreator = Objects.requireNonNull(sourceListenerFactory, "sourceListenerFactory").createFlinkColCreator();
List<FlinkCol> sourceColsMeta = FlinkCol.getAllTabColsMeta(tab.getCols(), sourceFlinkColCreator);

return new RowDataSinkFunc(this, tabName
return new RowDataSinkFunc(tabName
, sinkFunc.getSinkFunction()
, sinkFunc.primaryKeys
, sourceColsMeta
, AbstractRowDataMapper.getAllTabColsMeta(Objects.requireNonNull(sinkFunc.tableCols, "tabCols can not be null").getCols())
, supportUpsetDML()
, this.parallelism);
, this.parallelism, RowDataSinkFunc.createTransformerRules(dataxProcessor.identityValue(), tabName, tab, sourceFlinkColCreator));
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package com.qlangtech.plugins.incr.flink.cdc;

import com.qlangtech.tis.async.message.client.consumer.IFlinkColCreator;
import com.qlangtech.tis.plugin.ds.IColMetaGetter;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.table.data.RowData;
Expand All @@ -28,6 +29,9 @@
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

/**
* @author: 百岁([email protected]
Expand Down Expand Up @@ -61,6 +65,13 @@ public FlinkCol(IColMetaGetter meta, com.qlangtech.tis.plugin.ds.DataType colTyp
this(meta, colType, type, new NoOpProcess(), rowDataValGetter);
}

public static <T extends IColMetaGetter> List<FlinkCol> getAllTabColsMeta(List<T> colsMeta, IFlinkColCreator<FlinkCol> flinkColCreator) {
final AtomicInteger colIndex = new AtomicInteger();
return colsMeta.stream()
.map((c) -> flinkColCreator.build(c, colIndex.getAndIncrement()))
.collect(Collectors.toList());
}

public Object getRowDataVal(RowData row) {
try {
return rowDataValGetter.getFieldOrNull(row);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,26 @@
package com.qlangtech.tis.realtime;

import com.qlangtech.plugins.incr.flink.cdc.FlinkCol;
import com.qlangtech.tis.async.message.client.consumer.IFlinkColCreator;
import com.qlangtech.tis.datax.TableAlias;
import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules;
import com.qlangtech.tis.plugin.ds.ISelectedTab;
import com.qlangtech.tis.realtime.dto.DTOStream;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.connector.sink.DynamicTableSink.Context;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

/**
* <TRANSFER_OBJ/> 可以是用:
Expand All @@ -58,18 +65,20 @@ public List<FlinkCol> getSourceColsMeta() {
}

private transient Pair<String, FilterFunction<SINK_TRANSFER_OBJ>> sourceFilter;
protected transient final Optional<Triple<RecordTransformerRules, ISelectedTab, IFlinkColCreator<FlinkCol>>> transformers;

public TabSinkFunc(TableAlias tab, List<String> primaryKeys, SinkFunction<SINK_TRANSFER_OBJ> sinkFunction
, final List<FlinkCol> sinkColsMeta, int sinkTaskParallelism) {
this(tab, primaryKeys, sinkFunction, sinkColsMeta, sinkColsMeta, sinkTaskParallelism);
this(tab, primaryKeys, sinkFunction, sinkColsMeta, sinkColsMeta, sinkTaskParallelism, Optional.empty());
}

/**
* @param tab
* @param sinkFunction
*/
public TabSinkFunc(TableAlias tab, List<String> primaryKeys, SinkFunction<SINK_TRANSFER_OBJ> sinkFunction
, final List<FlinkCol> sourceColsMeta, final List<FlinkCol> sinkColsMeta, int sinkTaskParallelism) {
, final List<FlinkCol> sourceColsMeta, final List<FlinkCol> sinkColsMeta, int sinkTaskParallelism
, Optional<Triple<RecordTransformerRules, ISelectedTab, IFlinkColCreator<FlinkCol>>> transformerOpt) {
if (CollectionUtils.isEmpty(sinkColsMeta)) {
throw new IllegalArgumentException("colsMeta can not be empty");
}
Expand All @@ -82,7 +91,7 @@ public TabSinkFunc(TableAlias tab, List<String> primaryKeys, SinkFunction<SINK_T
this.sinkTaskParallelism = sinkTaskParallelism;
this.sinkColsMeta = sinkColsMeta;
this.sourceColsMeta = sourceColsMeta;
// this.env = env;
this.transformers = transformerOpt;
}

public List<String> getPrimaryKeys() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.qlangtech.plugins.incr.flink.cdc.RowFieldGetterFactory;
import com.qlangtech.tis.async.message.client.consumer.IFlinkColCreator;
import com.qlangtech.tis.coredefine.module.action.TargetResName;
import com.qlangtech.tis.datax.IStreamTableMeataCreator;
import com.qlangtech.tis.datax.IStreamTableMeta;
import com.qlangtech.tis.plugin.ds.DataType;
import com.qlangtech.tis.plugin.ds.DataTypeMeta;
Expand Down Expand Up @@ -92,23 +91,16 @@ public static List<FlinkCol> getAllTabColsMeta(IStreamTableMeta streamTableMeta)
}

public static <T extends IColMetaGetter> List<FlinkCol> getAllTabColsMeta(List<T> colsMeta) {
return getAllTabColsMeta(colsMeta, AbstractRowDataMapper::mapFlinkCol);
return FlinkCol.getAllTabColsMeta(colsMeta, AbstractRowDataMapper::mapFlinkCol);
}


public static <T extends IColMetaGetter> List<FlinkCol> getAllTabColsMeta(List<T> colsMeta, IFlinkColCreator<FlinkCol> flinkColCreator) {
final AtomicInteger colIndex = new AtomicInteger();
return colsMeta.stream()
.map((c) -> flinkColCreator.build(c, colIndex.getAndIncrement()))
.collect(Collectors.toList());
}

public static <T extends IColMetaGetter> FlinkColMapper getAllTabColsMetaMapper(List<T> colsMeta) {
return getAllTabColsMetaMapper(colsMeta, AbstractRowDataMapper::mapFlinkCol);
}

public static <T extends IColMetaGetter> FlinkColMapper getAllTabColsMetaMapper(List<T> colsMeta, IFlinkColCreator<FlinkCol> flinkColCreator) {
List<FlinkCol> cols = getAllTabColsMeta(colsMeta, flinkColCreator);
List<FlinkCol> cols = FlinkCol.getAllTabColsMeta(colsMeta, flinkColCreator);
return new FlinkColMapper(cols.stream().collect(Collectors.toMap((c) -> c.name, (c) -> c)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@

import com.qlangtech.plugins.incr.flink.cdc.FlinkCol;
import com.qlangtech.tis.async.message.client.consumer.IFlinkColCreator;
import com.qlangtech.tis.datax.IDataXNameAware;
import com.qlangtech.tis.datax.IDataxProcessor;
import com.qlangtech.tis.datax.TableAlias;
import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules;
import com.qlangtech.tis.plugin.ds.ISelectedTab;
import com.qlangtech.tis.plugin.incr.TISSinkFactory;
import com.qlangtech.tis.plugins.incr.flink.cdc.DTO2RowDataMapper;
import com.qlangtech.tis.plugins.incr.flink.cdc.ReocrdTransformerMapper;
import com.qlangtech.tis.plugins.incr.flink.cdc.impl.RowDataTransformerMapper;
import com.qlangtech.tis.realtime.dto.DTOStream;
import com.qlangtech.tis.realtime.transfer.DTO;
import com.qlangtech.tis.util.IPluginContext;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
Expand Down Expand Up @@ -89,24 +89,32 @@ protected DataStream<DTO> streamMap(DTOStream sourceStream) {
*/
public final static class RowDataSinkFunc extends TabSinkFunc<RowData> {

private final Optional<RecordTransformerRules> transformers;
// private final IFlinkColCreator<FlinkCol> flinkColCreator;
public static Optional<Triple<RecordTransformerRules, ISelectedTab, IFlinkColCreator<FlinkCol>>>
createTransformerRules(String dataXName, TableAlias tabAlias, ISelectedTab tab, IFlinkColCreator<FlinkCol> sourceFlinkColCreator) {

public RowDataSinkFunc(IDataXNameAware dataXName, TableAlias tab
, SinkFunction<RowData> sinkFunction, List<String> primaryKeys, List<FlinkCol> colsMeta
, boolean supportUpset, int sinkTaskParallelism) {
this(dataXName, tab, sinkFunction, primaryKeys, colsMeta, colsMeta, supportUpset, sinkTaskParallelism);
RecordTransformerRules transformerRules = RecordTransformerRules.loadTransformerRules(IPluginContext.namedContext(dataXName), tabAlias.getFrom());
Optional<Triple<RecordTransformerRules, ISelectedTab, IFlinkColCreator<FlinkCol>>> transformerOpt
= (transformerRules != null) ? Optional.of(Triple.of(transformerRules, tab, sourceFlinkColCreator)) : Optional.empty();
return transformerOpt;
}
// private final IFlinkColCreator<FlinkCol> flinkColCreator;

public RowDataSinkFunc(IDataXNameAware dataXName, TableAlias tab
// public RowDataSinkFunc(TableAlias tab
// , SinkFunction<RowData> sinkFunction, List<String> primaryKeys, List<FlinkCol> colsMeta
// , boolean supportUpset, int sinkTaskParallelism) {
// this(tab, sinkFunction, primaryKeys, colsMeta, colsMeta, supportUpset, sinkTaskParallelism, Optional.empty());
// }


public RowDataSinkFunc(TableAlias tab
, SinkFunction<RowData> sinkFunction //
, List<String> primaryKeys //
, final List<FlinkCol> sourceColsMeta //
, List<FlinkCol> sinkColsMeta //
, boolean supportUpset, int sinkTaskParallelism) {
super(tab, primaryKeys, sinkFunction, sourceColsMeta, sinkColsMeta, sinkTaskParallelism);
this.transformers
= Optional.ofNullable(RecordTransformerRules.loadTransformerRules(IPluginContext.namedContext(dataXName.getTISDataXName()), tab.getFrom()));
, boolean supportUpset, int sinkTaskParallelism, Optional<Triple<RecordTransformerRules, ISelectedTab, IFlinkColCreator<FlinkCol>>> transformerOpt) {
super(tab, primaryKeys, sinkFunction, sourceColsMeta, sinkColsMeta, sinkTaskParallelism
, transformerOpt);

//this.flinkColCreator = Objects.requireNonNull(flinkColCreator, "flinkColCreator can not be null");
if (supportUpset) {
this.setSourceFilter("skipUpdateBeforeEvent"
Expand Down Expand Up @@ -136,7 +144,13 @@ protected DataStream<RowData> streamMap(DTOStream sourceStream) {
}

if (transformers.isPresent()) {
return result.map(new RowDataTransformerMapper(this.sourceColsMeta, transformers.get()))
Triple<RecordTransformerRules, ISelectedTab, IFlinkColCreator<FlinkCol>> triple = transformers.get();
RecordTransformerRules rule = triple.getLeft();
ISelectedTab table = triple.getMiddle();

return result.map(new RowDataTransformerMapper(
FlinkCol.getAllTabColsMeta(rule.overwriteCols(table.getCols())
, Objects.requireNonNull(triple.getRight(), "flinkColCreator")), triple.getLeft()))
.name(tab.getFrom() + "_transformer").setParallelism(this.sinkTaskParallelism);
} else {
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,11 @@ protected void registerSourceTable(StreamTableEnvironment tabEnv
if (transformers.isPresent()) {
tRules = transformers.get();
srcCols = tRules.overwriteCols(srcCols);
cols = AbstractRowDataMapper.getAllTabColsMeta(srcCols, Objects.requireNonNull(this.flinkColCreator, "flinkColCreator"));
cols = FlinkCol.getAllTabColsMeta(srcCols, Objects.requireNonNull(this.flinkColCreator, "flinkColCreator"));
transformerMapper = new RowTransformerMapper(cols, transformers.get());
outputTypeSchema = RowUtils.outputTypeSchema(transformerMapper.cols, selectedTab.getPrimaryKeys());
} else {
cols = AbstractRowDataMapper.getAllTabColsMeta(srcCols, Objects.requireNonNull(this.flinkColCreator, "flinkColCreator"));
cols = FlinkCol.getAllTabColsMeta(srcCols, Objects.requireNonNull(this.flinkColCreator, "flinkColCreator"));
outputTypeSchema = RowUtils.outputTypeSchema(cols, selectedTab.getPrimaryKeys());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,17 @@
import com.qlangtech.tis.plugin.annotation.Validator;
import com.qlangtech.tis.plugin.datax.DataXElasticsearchWriter;
import com.qlangtech.tis.plugin.datax.elastic.ElasticEndpoint;
import com.qlangtech.tis.plugin.ds.CMeta;
import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules;
import com.qlangtech.tis.plugin.ds.IColMetaGetter;
import com.qlangtech.tis.plugin.ds.ISelectedTab;
import com.qlangtech.tis.plugins.incr.flink.cdc.AbstractRowDataMapper;
import com.qlangtech.tis.realtime.BasicTISSinkFactory;
import com.qlangtech.tis.realtime.TabSinkFunc;
import com.qlangtech.tis.runtime.module.misc.IControlMsgHandler;
import org.apache.commons.collections.CollectionUtils;
import com.qlangtech.tis.util.IPluginContext;
import org.apache.commons.compress.utils.Lists;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
Expand Down Expand Up @@ -90,13 +91,13 @@ public class ElasticSearchSinkFactory extends BasicTISSinkFactory<RowData> {
private static final Logger logger = LoggerFactory.getLogger(ElasticSearchSinkFactory.class);
private static final int DEFAULT_PARALLELISM = 1;// parallelism
// bulk.flush.max.actions
@FormField(ordinal = 0, type = FormFieldType.INT_NUMBER, validate = Validator.integer)
@FormField(ordinal = 1, advance = true, type = FormFieldType.INT_NUMBER, validate = Validator.integer)
public Integer bulkFlushMaxActions;

@FormField(ordinal = 1, type = FormFieldType.INT_NUMBER, validate = Validator.integer)
@FormField(ordinal = 2, advance = true, type = FormFieldType.INT_NUMBER, validate = Validator.integer)
public Integer bulkFlushMaxSizeMb;

@FormField(ordinal = 2, type = FormFieldType.INT_NUMBER, validate = {Validator.integer, Validator.require})
@FormField(ordinal = 0, type = FormFieldType.INT_NUMBER, validate = {Validator.integer, Validator.require})
public Integer bulkFlushIntervalMs;

static {
Expand Down Expand Up @@ -166,15 +167,15 @@ public Map<TableAlias, TabSinkFunc<RowData>> createSinkFunction(IDataxProcessor
IFlinkColCreator<FlinkCol> flinkColCreator = AbstractRowDataMapper::mapFlinkCol;

Objects.requireNonNull(tab, "tab ca not be null");
final List<FlinkCol> sourceColsMeta = AbstractRowDataMapper.getAllTabColsMeta(tab.getCols(), sourceFlinkColCreator);
final List<FlinkCol> sinkColsMeta = AbstractRowDataMapper.getAllTabColsMeta(sinkMcols, flinkColCreator);

final List<FlinkCol> sinkColsMeta = FlinkCol.getAllTabColsMeta(sinkMcols, flinkColCreator);
ElasticsearchSink.Builder<RowData> sinkBuilder
= new ElasticsearchSink.Builder<>(transportAddresses
, new DefaultElasticsearchSinkFunction(
esCols.stream().map((c) -> c.getName()).collect(Collectors.toSet())
, sinkColsMeta
//, firstPK.get().getName()
,primaryKeys.get(0)
, primaryKeys.get(0)
, dataXWriter.getIndexName()));

if (this.bulkFlushMaxActions != null) {
Expand Down Expand Up @@ -203,12 +204,16 @@ public Void visit(UsernamePassword accessKey) {
return null;
}
});
final List<FlinkCol> sourceColsMeta = FlinkCol.getAllTabColsMeta(tab.getCols(), sourceFlinkColCreator);


Optional<Triple<RecordTransformerRules, ISelectedTab, IFlinkColCreator<FlinkCol>>> transformerOpt
= RowDataSinkFunc.createTransformerRules(dataxProcessor.identityValue(), esSchema, tab, sourceFlinkColCreator);
return Collections.singletonMap(esSchema
, new RowDataSinkFunc(this, esSchema, sinkBuilder.build(), primaryKeys
, new RowDataSinkFunc(esSchema, sinkBuilder.build(), primaryKeys
, sourceColsMeta
, sinkColsMeta
, true, DEFAULT_PARALLELISM));
, true, DEFAULT_PARALLELISM, transformerOpt));
}


Expand Down

0 comments on commit f4017c2

Please sign in to comment.