diff --git a/tis-incr/tis-chunjun-base-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/connector/ChunjunSinkFactory.java b/tis-incr/tis-chunjun-base-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/connector/ChunjunSinkFactory.java index 6a5db0554..809c3400e 100644 --- a/tis-incr/tis-chunjun-base-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/connector/ChunjunSinkFactory.java +++ b/tis-incr/tis-chunjun-base-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/connector/ChunjunSinkFactory.java @@ -47,6 +47,7 @@ import com.qlangtech.plugins.incr.flink.cdc.FlinkCol; import com.qlangtech.tis.TIS; import com.qlangtech.tis.async.message.client.consumer.IFlinkColCreator; +import com.qlangtech.tis.async.message.client.consumer.impl.MQListenerFactory; import com.qlangtech.tis.datax.IDataXNameAware; import com.qlangtech.tis.datax.IDataxProcessor; import com.qlangtech.tis.datax.IDataxReader; @@ -83,6 +84,7 @@ import com.qlangtech.tis.runtime.module.misc.IFieldErrorHandler; import com.qlangtech.tis.sql.parser.tuple.creator.EntityName; import com.qlangtech.tis.sql.parser.tuple.creator.IStreamIncrGenerateStrategy; +import com.qlangtech.tis.util.HeteroEnum; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; import org.apache.flink.api.common.io.OutputFormat; @@ -205,22 +207,27 @@ public RowDataSinkFunc createRowDataSinkFunc(IDataxProcessor dataxProcessor + " can not find matched table in:[" + tabs.stream().map((t) -> t.getName()).collect(Collectors.joining(",")) + "]"); } - + final SelectedTab tab = (SelectedTab) selectedTab.get(); final CreateChunjunSinkFunctionResult sinkFunc = createSinFunctionResult(dataxProcessor - , (SelectedTab) selectedTab.get(), tabName.getTo(), shallInitSinkTable); + , tab, tabName.getTo(), shallInitSinkTable); if (this.parallelism == null) { throw new IllegalStateException("param parallelism can not be null"); } +//String dataXName, TableAlias tabAlias, ISelectedTab tab, IFlinkColCreator sourceFlinkColCreator + MQListenerFactory sourceListenerFactory = HeteroEnum.getIncrSourceListenerFactory(dataxProcessor.identityValue()); + IFlinkColCreator sourceFlinkColCreator = Objects.requireNonNull(sourceListenerFactory, "sourceListenerFactory").createFlinkColCreator(); + List sourceColsMeta = FlinkCol.getAllTabColsMeta(tab.getCols(), sourceFlinkColCreator); - return new RowDataSinkFunc(this, tabName + return new RowDataSinkFunc(tabName , sinkFunc.getSinkFunction() , sinkFunc.primaryKeys + , sourceColsMeta , AbstractRowDataMapper.getAllTabColsMeta(Objects.requireNonNull(sinkFunc.tableCols, "tabCols can not be null").getCols()) , supportUpsetDML() - , this.parallelism); + , this.parallelism, RowDataSinkFunc.createTransformerRules(dataxProcessor.identityValue(), tabName, tab, sourceFlinkColCreator)); } diff --git a/tis-incr/tis-flink-extends/src/main/java/com/qlangtech/plugins/incr/flink/cdc/FlinkCol.java b/tis-incr/tis-flink-extends/src/main/java/com/qlangtech/plugins/incr/flink/cdc/FlinkCol.java index a074ae72f..6a7fbd7e7 100644 --- a/tis-incr/tis-flink-extends/src/main/java/com/qlangtech/plugins/incr/flink/cdc/FlinkCol.java +++ b/tis-incr/tis-flink-extends/src/main/java/com/qlangtech/plugins/incr/flink/cdc/FlinkCol.java @@ -18,6 +18,7 @@ package com.qlangtech.plugins.incr.flink.cdc; +import com.qlangtech.tis.async.message.client.consumer.IFlinkColCreator; import com.qlangtech.tis.plugin.ds.IColMetaGetter; import org.apache.commons.lang3.StringUtils; import org.apache.flink.table.data.RowData; @@ -28,6 +29,9 @@ import java.time.LocalDate; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; /** * @author: 百岁(baisui@qlangtech.com) @@ -61,6 +65,13 @@ public FlinkCol(IColMetaGetter meta, com.qlangtech.tis.plugin.ds.DataType colTyp this(meta, colType, type, new NoOpProcess(), rowDataValGetter); } + public static List getAllTabColsMeta(List colsMeta, IFlinkColCreator flinkColCreator) { + final AtomicInteger colIndex = new AtomicInteger(); + return colsMeta.stream() + .map((c) -> flinkColCreator.build(c, colIndex.getAndIncrement())) + .collect(Collectors.toList()); + } + public Object getRowDataVal(RowData row) { try { return rowDataValGetter.getFieldOrNull(row); diff --git a/tis-incr/tis-flink-extends/src/main/java/com/qlangtech/tis/realtime/TabSinkFunc.java b/tis-incr/tis-flink-extends/src/main/java/com/qlangtech/tis/realtime/TabSinkFunc.java index 736ed92d1..3ee38f471 100644 --- a/tis-incr/tis-flink-extends/src/main/java/com/qlangtech/tis/realtime/TabSinkFunc.java +++ b/tis-incr/tis-flink-extends/src/main/java/com/qlangtech/tis/realtime/TabSinkFunc.java @@ -19,11 +19,15 @@ package com.qlangtech.tis.realtime; import com.qlangtech.plugins.incr.flink.cdc.FlinkCol; +import com.qlangtech.tis.async.message.client.consumer.IFlinkColCreator; import com.qlangtech.tis.datax.TableAlias; +import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules; +import com.qlangtech.tis.plugin.ds.ISelectedTab; import com.qlangtech.tis.realtime.dto.DTOStream; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; +import org.apache.commons.lang3.tuple.Triple; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; @@ -31,7 +35,10 @@ import org.apache.flink.table.connector.sink.DynamicTableSink.Context; import org.apache.flink.table.connector.sink.SinkFunctionProvider; +import java.util.Collections; import java.util.List; +import java.util.Objects; +import java.util.Optional; /** * 可以是用: @@ -58,10 +65,11 @@ public List getSourceColsMeta() { } private transient Pair> sourceFilter; + protected transient final Optional>> transformers; public TabSinkFunc(TableAlias tab, List primaryKeys, SinkFunction sinkFunction , final List sinkColsMeta, int sinkTaskParallelism) { - this(tab, primaryKeys, sinkFunction, sinkColsMeta, sinkColsMeta, sinkTaskParallelism); + this(tab, primaryKeys, sinkFunction, sinkColsMeta, sinkColsMeta, sinkTaskParallelism, Optional.empty()); } /** @@ -69,7 +77,8 @@ public TabSinkFunc(TableAlias tab, List primaryKeys, SinkFunction primaryKeys, SinkFunction sinkFunction - , final List sourceColsMeta, final List sinkColsMeta, int sinkTaskParallelism) { + , final List sourceColsMeta, final List sinkColsMeta, int sinkTaskParallelism + , Optional>> transformerOpt) { if (CollectionUtils.isEmpty(sinkColsMeta)) { throw new IllegalArgumentException("colsMeta can not be empty"); } @@ -82,7 +91,7 @@ public TabSinkFunc(TableAlias tab, List primaryKeys, SinkFunction getPrimaryKeys() { diff --git a/tis-incr/tis-realtime-flink/src/main/java/com/qlangtech/tis/plugins/incr/flink/cdc/AbstractRowDataMapper.java b/tis-incr/tis-realtime-flink/src/main/java/com/qlangtech/tis/plugins/incr/flink/cdc/AbstractRowDataMapper.java index 4c8610b41..e1dc917a6 100644 --- a/tis-incr/tis-realtime-flink/src/main/java/com/qlangtech/tis/plugins/incr/flink/cdc/AbstractRowDataMapper.java +++ b/tis-incr/tis-realtime-flink/src/main/java/com/qlangtech/tis/plugins/incr/flink/cdc/AbstractRowDataMapper.java @@ -23,7 +23,6 @@ import com.qlangtech.plugins.incr.flink.cdc.RowFieldGetterFactory; import com.qlangtech.tis.async.message.client.consumer.IFlinkColCreator; import com.qlangtech.tis.coredefine.module.action.TargetResName; -import com.qlangtech.tis.datax.IStreamTableMeataCreator; import com.qlangtech.tis.datax.IStreamTableMeta; import com.qlangtech.tis.plugin.ds.DataType; import com.qlangtech.tis.plugin.ds.DataTypeMeta; @@ -92,23 +91,16 @@ public static List getAllTabColsMeta(IStreamTableMeta streamTableMeta) } public static List getAllTabColsMeta(List colsMeta) { - return getAllTabColsMeta(colsMeta, AbstractRowDataMapper::mapFlinkCol); + return FlinkCol.getAllTabColsMeta(colsMeta, AbstractRowDataMapper::mapFlinkCol); } - public static List getAllTabColsMeta(List colsMeta, IFlinkColCreator flinkColCreator) { - final AtomicInteger colIndex = new AtomicInteger(); - return colsMeta.stream() - .map((c) -> flinkColCreator.build(c, colIndex.getAndIncrement())) - .collect(Collectors.toList()); - } - public static FlinkColMapper getAllTabColsMetaMapper(List colsMeta) { return getAllTabColsMetaMapper(colsMeta, AbstractRowDataMapper::mapFlinkCol); } public static FlinkColMapper getAllTabColsMetaMapper(List colsMeta, IFlinkColCreator flinkColCreator) { - List cols = getAllTabColsMeta(colsMeta, flinkColCreator); + List cols = FlinkCol.getAllTabColsMeta(colsMeta, flinkColCreator); return new FlinkColMapper(cols.stream().collect(Collectors.toMap((c) -> c.name, (c) -> c))); } diff --git a/tis-incr/tis-realtime-flink/src/main/java/com/qlangtech/tis/realtime/BasicTISSinkFactory.java b/tis-incr/tis-realtime-flink/src/main/java/com/qlangtech/tis/realtime/BasicTISSinkFactory.java index 81a1ca032..fc5e0a3e5 100644 --- a/tis-incr/tis-realtime-flink/src/main/java/com/qlangtech/tis/realtime/BasicTISSinkFactory.java +++ b/tis-incr/tis-realtime-flink/src/main/java/com/qlangtech/tis/realtime/BasicTISSinkFactory.java @@ -20,17 +20,17 @@ import com.qlangtech.plugins.incr.flink.cdc.FlinkCol; import com.qlangtech.tis.async.message.client.consumer.IFlinkColCreator; -import com.qlangtech.tis.datax.IDataXNameAware; import com.qlangtech.tis.datax.IDataxProcessor; import com.qlangtech.tis.datax.TableAlias; import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules; +import com.qlangtech.tis.plugin.ds.ISelectedTab; import com.qlangtech.tis.plugin.incr.TISSinkFactory; import com.qlangtech.tis.plugins.incr.flink.cdc.DTO2RowDataMapper; -import com.qlangtech.tis.plugins.incr.flink.cdc.ReocrdTransformerMapper; import com.qlangtech.tis.plugins.incr.flink.cdc.impl.RowDataTransformerMapper; import com.qlangtech.tis.realtime.dto.DTOStream; import com.qlangtech.tis.realtime.transfer.DTO; import com.qlangtech.tis.util.IPluginContext; +import org.apache.commons.lang3.tuple.Triple; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.table.connector.sink.SinkFunctionProvider; @@ -89,24 +89,32 @@ protected DataStream streamMap(DTOStream sourceStream) { */ public final static class RowDataSinkFunc extends TabSinkFunc { - private final Optional transformers; - // private final IFlinkColCreator flinkColCreator; + public static Optional>> + createTransformerRules(String dataXName, TableAlias tabAlias, ISelectedTab tab, IFlinkColCreator sourceFlinkColCreator) { - public RowDataSinkFunc(IDataXNameAware dataXName, TableAlias tab - , SinkFunction sinkFunction, List primaryKeys, List colsMeta - , boolean supportUpset, int sinkTaskParallelism) { - this(dataXName, tab, sinkFunction, primaryKeys, colsMeta, colsMeta, supportUpset, sinkTaskParallelism); + RecordTransformerRules transformerRules = RecordTransformerRules.loadTransformerRules(IPluginContext.namedContext(dataXName), tabAlias.getFrom()); + Optional>> transformerOpt + = (transformerRules != null) ? Optional.of(Triple.of(transformerRules, tab, sourceFlinkColCreator)) : Optional.empty(); + return transformerOpt; } + // private final IFlinkColCreator flinkColCreator; - public RowDataSinkFunc(IDataXNameAware dataXName, TableAlias tab +// public RowDataSinkFunc(TableAlias tab +// , SinkFunction sinkFunction, List primaryKeys, List colsMeta +// , boolean supportUpset, int sinkTaskParallelism) { +// this(tab, sinkFunction, primaryKeys, colsMeta, colsMeta, supportUpset, sinkTaskParallelism, Optional.empty()); +// } + + + public RowDataSinkFunc(TableAlias tab , SinkFunction sinkFunction // , List primaryKeys // , final List sourceColsMeta // , List sinkColsMeta // - , boolean supportUpset, int sinkTaskParallelism) { - super(tab, primaryKeys, sinkFunction, sourceColsMeta, sinkColsMeta, sinkTaskParallelism); - this.transformers - = Optional.ofNullable(RecordTransformerRules.loadTransformerRules(IPluginContext.namedContext(dataXName.getTISDataXName()), tab.getFrom())); + , boolean supportUpset, int sinkTaskParallelism, Optional>> transformerOpt) { + super(tab, primaryKeys, sinkFunction, sourceColsMeta, sinkColsMeta, sinkTaskParallelism + , transformerOpt); + //this.flinkColCreator = Objects.requireNonNull(flinkColCreator, "flinkColCreator can not be null"); if (supportUpset) { this.setSourceFilter("skipUpdateBeforeEvent" @@ -136,7 +144,13 @@ protected DataStream streamMap(DTOStream sourceStream) { } if (transformers.isPresent()) { - return result.map(new RowDataTransformerMapper(this.sourceColsMeta, transformers.get())) + Triple> triple = transformers.get(); + RecordTransformerRules rule = triple.getLeft(); + ISelectedTab table = triple.getMiddle(); + + return result.map(new RowDataTransformerMapper( + FlinkCol.getAllTabColsMeta(rule.overwriteCols(table.getCols()) + , Objects.requireNonNull(triple.getRight(), "flinkColCreator")), triple.getLeft())) .name(tab.getFrom() + "_transformer").setParallelism(this.sinkTaskParallelism); } else { return result; diff --git a/tis-incr/tis-realtime-flink/src/main/java/com/qlangtech/tis/realtime/TableRegisterFlinkSourceHandle.java b/tis-incr/tis-realtime-flink/src/main/java/com/qlangtech/tis/realtime/TableRegisterFlinkSourceHandle.java index 8ad5d2cb2..24d1c921e 100644 --- a/tis-incr/tis-realtime-flink/src/main/java/com/qlangtech/tis/realtime/TableRegisterFlinkSourceHandle.java +++ b/tis-incr/tis-realtime-flink/src/main/java/com/qlangtech/tis/realtime/TableRegisterFlinkSourceHandle.java @@ -217,11 +217,11 @@ protected void registerSourceTable(StreamTableEnvironment tabEnv if (transformers.isPresent()) { tRules = transformers.get(); srcCols = tRules.overwriteCols(srcCols); - cols = AbstractRowDataMapper.getAllTabColsMeta(srcCols, Objects.requireNonNull(this.flinkColCreator, "flinkColCreator")); + cols = FlinkCol.getAllTabColsMeta(srcCols, Objects.requireNonNull(this.flinkColCreator, "flinkColCreator")); transformerMapper = new RowTransformerMapper(cols, transformers.get()); outputTypeSchema = RowUtils.outputTypeSchema(transformerMapper.cols, selectedTab.getPrimaryKeys()); } else { - cols = AbstractRowDataMapper.getAllTabColsMeta(srcCols, Objects.requireNonNull(this.flinkColCreator, "flinkColCreator")); + cols = FlinkCol.getAllTabColsMeta(srcCols, Objects.requireNonNull(this.flinkColCreator, "flinkColCreator")); outputTypeSchema = RowUtils.outputTypeSchema(cols, selectedTab.getPrimaryKeys()); } diff --git a/tis-incr/tis-sink-elasticsearch7-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/connector/elasticsearch7/ElasticSearchSinkFactory.java b/tis-incr/tis-sink-elasticsearch7-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/connector/elasticsearch7/ElasticSearchSinkFactory.java index d6bf99ac2..11842bd97 100644 --- a/tis-incr/tis-sink-elasticsearch7-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/connector/elasticsearch7/ElasticSearchSinkFactory.java +++ b/tis-incr/tis-sink-elasticsearch7-plugin/src/main/java/com/qlangtech/tis/plugins/incr/flink/connector/elasticsearch7/ElasticSearchSinkFactory.java @@ -42,16 +42,17 @@ import com.qlangtech.tis.plugin.annotation.Validator; import com.qlangtech.tis.plugin.datax.DataXElasticsearchWriter; import com.qlangtech.tis.plugin.datax.elastic.ElasticEndpoint; -import com.qlangtech.tis.plugin.ds.CMeta; +import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules; import com.qlangtech.tis.plugin.ds.IColMetaGetter; import com.qlangtech.tis.plugin.ds.ISelectedTab; import com.qlangtech.tis.plugins.incr.flink.cdc.AbstractRowDataMapper; import com.qlangtech.tis.realtime.BasicTISSinkFactory; import com.qlangtech.tis.realtime.TabSinkFunc; import com.qlangtech.tis.runtime.module.misc.IControlMsgHandler; -import org.apache.commons.collections.CollectionUtils; +import com.qlangtech.tis.util.IPluginContext; import org.apache.commons.compress.utils.Lists; import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.tuple.Triple; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler; import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; @@ -90,13 +91,13 @@ public class ElasticSearchSinkFactory extends BasicTISSinkFactory { private static final Logger logger = LoggerFactory.getLogger(ElasticSearchSinkFactory.class); private static final int DEFAULT_PARALLELISM = 1;// parallelism // bulk.flush.max.actions - @FormField(ordinal = 0, type = FormFieldType.INT_NUMBER, validate = Validator.integer) + @FormField(ordinal = 1, advance = true, type = FormFieldType.INT_NUMBER, validate = Validator.integer) public Integer bulkFlushMaxActions; - @FormField(ordinal = 1, type = FormFieldType.INT_NUMBER, validate = Validator.integer) + @FormField(ordinal = 2, advance = true, type = FormFieldType.INT_NUMBER, validate = Validator.integer) public Integer bulkFlushMaxSizeMb; - @FormField(ordinal = 2, type = FormFieldType.INT_NUMBER, validate = {Validator.integer, Validator.require}) + @FormField(ordinal = 0, type = FormFieldType.INT_NUMBER, validate = {Validator.integer, Validator.require}) public Integer bulkFlushIntervalMs; static { @@ -166,15 +167,15 @@ public Map> createSinkFunction(IDataxProcessor IFlinkColCreator flinkColCreator = AbstractRowDataMapper::mapFlinkCol; Objects.requireNonNull(tab, "tab ca not be null"); - final List sourceColsMeta = AbstractRowDataMapper.getAllTabColsMeta(tab.getCols(), sourceFlinkColCreator); - final List sinkColsMeta = AbstractRowDataMapper.getAllTabColsMeta(sinkMcols, flinkColCreator); + + final List sinkColsMeta = FlinkCol.getAllTabColsMeta(sinkMcols, flinkColCreator); ElasticsearchSink.Builder sinkBuilder = new ElasticsearchSink.Builder<>(transportAddresses , new DefaultElasticsearchSinkFunction( esCols.stream().map((c) -> c.getName()).collect(Collectors.toSet()) , sinkColsMeta //, firstPK.get().getName() - ,primaryKeys.get(0) + , primaryKeys.get(0) , dataXWriter.getIndexName())); if (this.bulkFlushMaxActions != null) { @@ -203,12 +204,16 @@ public Void visit(UsernamePassword accessKey) { return null; } }); + final List sourceColsMeta = FlinkCol.getAllTabColsMeta(tab.getCols(), sourceFlinkColCreator); + + Optional>> transformerOpt + = RowDataSinkFunc.createTransformerRules(dataxProcessor.identityValue(), esSchema, tab, sourceFlinkColCreator); return Collections.singletonMap(esSchema - , new RowDataSinkFunc(this, esSchema, sinkBuilder.build(), primaryKeys + , new RowDataSinkFunc(esSchema, sinkBuilder.build(), primaryKeys , sourceColsMeta , sinkColsMeta - , true, DEFAULT_PARALLELISM)); + , true, DEFAULT_PARALLELISM, transformerOpt)); }