Skip to content

Commit

Permalink
add transformer for TIS adapter in reltine flink module
Browse files Browse the repository at this point in the history
  • Loading branch information
baisui1981 committed Jun 19, 2024
1 parent 6836344 commit ab5fab8
Show file tree
Hide file tree
Showing 18 changed files with 161 additions and 92 deletions.
Original file line number Diff line number Diff line change
@@ -1,24 +1,25 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// */
package com.qlangtech.async.message.client.consumer;

import com.qlangtech.tis.annotation.Public;
import com.qlangtech.tis.async.message.client.consumer.IFlinkColCreator;
import com.qlangtech.tis.async.message.client.consumer.IMQListener;
import com.qlangtech.tis.async.message.client.consumer.impl.AbstractAsyncMsgDeserialize;
import com.qlangtech.tis.async.message.client.consumer.impl.MQListenerFactory;
Expand Down Expand Up @@ -142,17 +143,26 @@ public void setDeserialize(AbstractAsyncMsgDeserialize deserialize) {
this.deserialize = deserialize;
}


@Override
public <FlinkColType> IFlinkColCreator<FlinkColType> createFlinkColCreator() {
// return AbstractRowDataMapper::mapFlinkCol;;
throw new UnsupportedOperationException();
}

@TISExtension(ordinal = 0)
public static class DefaultDescriptor extends BaseDescriptor {

@Override
public String getDisplayName() {
return "RocketMq";
}

@Override
public PluginVender getVender() {
return PluginVender.TIS;
}

@Override
public IEndTypeGetter.EndType getEndType() {
return IEndTypeGetter.EndType.RocketMQ;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,13 @@
import com.qlangtech.tis.plugin.datax.SelectedTab;
import com.qlangtech.tis.plugin.datax.SelectedTabExtend;
import com.qlangtech.tis.plugin.datax.common.BasicDataXRdbmsWriter;
import com.qlangtech.tis.plugin.datax.transformer.RecordTransformerRules;
import com.qlangtech.tis.plugin.ds.BasicDataSourceFactory;
import com.qlangtech.tis.plugin.ds.CMeta;
import com.qlangtech.tis.plugin.ds.DBConfig;
import com.qlangtech.tis.plugin.ds.DataSourceFactory;
import com.qlangtech.tis.plugin.ds.DataSourceMeta;
import com.qlangtech.tis.plugin.ds.IColMetaGetter;
import com.qlangtech.tis.plugin.ds.ISelectedTab;
import com.qlangtech.tis.plugin.incr.CreatedSinkFunction;
import com.qlangtech.tis.plugin.incr.ISelectedTabExtendFactory;
import com.qlangtech.tis.plugins.incr.flink.cdc.AbstractRowDataMapper;
import com.qlangtech.tis.plugins.incr.flink.chunjun.common.DialectUtils;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,31 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.qlangtech.plugins.incr.flink.cdc.mongdb;

import com.qlangtech.plugins.incr.flink.cdc.FlinkCol;
import com.qlangtech.tis.annotation.Public;
import com.qlangtech.tis.async.message.client.consumer.IConsumerHandle;
import com.qlangtech.tis.async.message.client.consumer.IFlinkColCreator;
import com.qlangtech.tis.async.message.client.consumer.IMQListener;
import com.qlangtech.tis.async.message.client.consumer.impl.MQListenerFactory;
import com.qlangtech.tis.plugin.IEndTypeGetter;
import com.qlangtech.tis.plugin.annotation.FormField;
import com.qlangtech.tis.plugin.annotation.FormFieldType;
import com.qlangtech.tis.plugin.annotation.Validator;
import com.qlangtech.tis.plugins.incr.flink.cdc.AbstractRowDataMapper;

/**
* https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mongodb-cdc.html
Expand Down Expand Up @@ -68,6 +71,11 @@ public class FlinkCDCMongoDBSourceFactory extends MQListenerFactory {

private transient IConsumerHandle consumerHandle;

@Override
public IFlinkColCreator<FlinkCol> createFlinkColCreator() {
return AbstractRowDataMapper::mapFlinkCol;
}

@Override
public IMQListener create() {
FlinkCDCMongoDBSourceFunction sourceFunction = new FlinkCDCMongoDBSourceFunction(this);
Expand Down Expand Up @@ -97,6 +105,7 @@ public String getDisplayName() {
public PluginVender getVender() {
return PluginVender.FLINK_CDC;
}

@Override
public IEndTypeGetter.EndType getEndType() {
return IEndTypeGetter.EndType.MongoDB;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public JobExecutionResult start(TargetResName dataxName, IDataxReader dataSource
SourceChannel sourceChannel = new SourceChannel(sourceFunctions);

sourceChannel.setFocusTabs(tabs, dataXProcessor.getTabAlias(null), DTOStream::createDispatched);
IFlinkColCreator<FlinkCol> flinkColCreator = null;
IFlinkColCreator<FlinkCol> flinkColCreator = this.sourceFactory.createFlinkColCreator();
return (JobExecutionResult) getConsumerHandle().consume(dataxName, sourceChannel, dataXProcessor, flinkColCreator);
} catch (Exception e) {
throw new MQConsumeException(e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.qlangtech.plugins.incr.flink.cdc.SourceChannel.HostDbs;
import com.qlangtech.tis.annotation.Public;
import com.qlangtech.tis.async.message.client.consumer.IConsumerHandle;
import com.qlangtech.tis.async.message.client.consumer.IFlinkColCreator;
import com.qlangtech.tis.async.message.client.consumer.IMQListener;
import com.qlangtech.tis.async.message.client.consumer.impl.MQListenerFactory;
import com.qlangtech.tis.datax.impl.DataxReader;
Expand All @@ -36,6 +37,7 @@
import com.qlangtech.tis.plugin.ds.DataSourceFactory;
import com.qlangtech.tis.plugin.ds.IDataSourceFactoryGetter;
import com.qlangtech.tis.plugin.ds.ISelectedTab;
import com.qlangtech.tis.plugins.incr.flink.cdc.mysql.FlinkCDCMysqlSourceFunction.MySQLCDCTypeVisitor;
import com.qlangtech.tis.plugins.incr.flink.cdc.mysql.FlinkCDCMysqlSourceFunction.MySQLReaderSourceCreator;
import com.qlangtech.tis.realtime.ReaderSource;
import com.qlangtech.tis.realtime.transfer.DTO;
Expand All @@ -46,6 +48,7 @@
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.flink.table.api.ValidationException;
import com.qlangtech.plugins.incr.flink.cdc.FlinkCol;

import java.util.List;
import java.util.Objects;
Expand All @@ -62,6 +65,15 @@ public class FlinkCDCMySQLSourceFactory extends MQListenerFactory {
@FormField(ordinal = 0, type = FormFieldType.ENUM, validate = {Validator.require})
public com.qlangtech.tis.plugins.incr.flink.cdc.mysql.startup.StartupOptions startupOptions;


@Override
public IFlinkColCreator<FlinkCol> createFlinkColCreator() {
final IFlinkColCreator flinkColCreator = (meta, colIndex) -> {
return meta.getType().accept(new MySQLCDCTypeVisitor(meta, colIndex));
};
return flinkColCreator;
}

/**
* binlog监听在独立的slot中执行
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,7 @@ public JobExecutionResult start(TargetResName dataxName, IDataxReader dataSource
BasicDataSourceFactory dsFactory = (BasicDataSourceFactory) rdbmsReader.getDataSourceFactory();
Map<String, FlinkColMapper> tabColsMapper = Maps.newHashMap();
TableInDB tablesInDB = dsFactory.getTablesInDB();
final IFlinkColCreator flinkColCreator = (meta, colIndex) -> {
return meta.getType().accept(new MySQLCDCTypeVisitor(meta, colIndex));
};
IFlinkColCreator<FlinkCol> flinkColCreator = sourceFactory.createFlinkColCreator();
for (ISelectedTab tab : tabs) {
FlinkColMapper colsMapper
= AbstractRowDataMapper.getAllTabColsMetaMapper(tab.getCols(), flinkColCreator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@

package com.qlangtech.plugins.incr.flink.cdc.oracle;

import com.qlangtech.plugins.incr.flink.cdc.FlinkCol;
import com.qlangtech.tis.annotation.Public;
import com.qlangtech.tis.async.message.client.consumer.IConsumerHandle;
import com.qlangtech.tis.async.message.client.consumer.IFlinkColCreator;
import com.qlangtech.tis.async.message.client.consumer.IMQListener;
import com.qlangtech.tis.async.message.client.consumer.impl.MQListenerFactory;
import com.qlangtech.tis.plugin.IEndTypeGetter;
import com.qlangtech.tis.plugin.annotation.FormField;
import com.qlangtech.tis.plugin.annotation.FormFieldType;
import com.qlangtech.tis.plugin.annotation.Validator;
import com.qlangtech.tis.plugins.incr.flink.cdc.AbstractRowDataMapper;
import com.ververica.cdc.connectors.base.options.StartupOptions;

import java.util.Objects;
Expand All @@ -43,6 +46,11 @@ public class FlinkCDCOracleSourceFactory extends MQListenerFactory {
@FormField(ordinal = 0, type = FormFieldType.ENUM, validate = {Validator.require})
public String startupOptions;

@Override
public final IFlinkColCreator<FlinkCol> createFlinkColCreator() {
return AbstractRowDataMapper::mapFlinkCol;
}

StartupOptions getStartupOptions() {
switch (startupOptions) {
case "latest":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,8 @@ public JobExecutionResult start(TargetResName dataxName, IDataxReader dataSource
if (StringUtils.isEmpty(schemaSupported.getDBSchema())) {
throw new IllegalStateException("dsFactory:" + dsFactory.dbName + " relevant dbSchema can not be null");
}
IFlinkColCreator<FlinkCol> flinkColCreator = (meta, colIndex) -> {
return meta.getType().accept(new PGCDCTypeVisitor(meta, colIndex));
};

final IFlinkColCreator<FlinkCol> flinkColCreator = this.sourceFactory.createFlinkColCreator();

List<ReaderSource> readerSources = SourceChannel.getSourceFunction(
dsFactory, tabs, (dbHost, dbs, tbs, debeziumProperties) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@

package com.qlangtech.plugins.incr.flink.cdc.postgresql;

import com.qlangtech.plugins.incr.flink.cdc.FlinkCol;
import com.qlangtech.plugins.incr.flink.cdc.postgresql.PGDTOColValProcess.PGCDCTypeVisitor;
import com.qlangtech.tis.annotation.Public;
import com.qlangtech.tis.async.message.client.consumer.IConsumerHandle;
import com.qlangtech.tis.async.message.client.consumer.IFlinkColCreator;
import com.qlangtech.tis.async.message.client.consumer.IMQListener;
import com.qlangtech.tis.async.message.client.consumer.impl.MQListenerFactory;
import com.qlangtech.tis.extension.TISExtension;
Expand Down Expand Up @@ -50,6 +53,14 @@ public class FlinkCDCPostreSQLSourceFactory extends MQListenerFactory {
// * values are decoderbufs, wal2json, wal2json_rds, wal2json_streaming,
// * wal2json_rds_streaming and pgoutput.

@Override
public IFlinkColCreator<FlinkCol> createFlinkColCreator() {
IFlinkColCreator<FlinkCol> flinkColCreator = (meta, colIndex) -> {
return meta.getType().accept(new PGCDCTypeVisitor(meta, colIndex));
};
return flinkColCreator;
}

/**
* The name of the Postgres logical decoding plug-in installed on the server. Supported values are decoderbufs, wal2json, wal2json_rds, wal2json_streaming, wal2json_rds_streaming and pgoutput.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.collect.Sets;
import com.qlangtech.plugins.incr.flink.cdc.IResultRows;
import com.qlangtech.plugins.incr.flink.junit.TISApplySkipFlinkClassloaderFactoryCreation;
import com.qlangtech.tis.async.message.client.consumer.IFlinkColCreator;
import com.qlangtech.tis.async.message.client.consumer.Tab2OutputTag;
import com.qlangtech.tis.datax.IDataxReader;
import com.qlangtech.tis.datax.TableAlias;
Expand Down Expand Up @@ -155,7 +156,8 @@ public void testCreateSinkFunction() throws Exception {
SelectedTab totalpayinfo = mock(tableName, SelectedTab.class);
SinkTabPropsExtends sinkExt = new SinkTabPropsExtends();
sinkExt.tabName = tableName;
sinkExt.uniqueKey = Collections.singletonList(colId);
totalpayinfo.primaryKeys = Collections.singletonList(colId);
// sinkExt.uniqueKey =
// ReplaceType replaceMode = new ReplaceType();
// replaceMode.updateKey = Collections.singletonList(colId);
InsertType insertType = new InsertType();
Expand Down Expand Up @@ -225,9 +227,9 @@ public ClickHouseDataSourceFactory getDataSourceFactory() {
clickHouseSinkFactory.batchSize = 100;
clickHouseSinkFactory.parallelism = 1;
clickHouseSinkFactory.semantic = "at-least-once";

IFlinkColCreator flinkColCreator = null;
Map<TableAlias, TabSinkFunc<RowData>>
sinkFuncs = clickHouseSinkFactory.createSinkFunction(dataxProcessor);
sinkFuncs = clickHouseSinkFactory.createSinkFunction(dataxProcessor, flinkColCreator);
Assert.assertTrue(sinkFuncs.size() > 0);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Expand Down Expand Up @@ -274,7 +276,7 @@ public ClickHouseDataSourceFactory getDataSourceFactory() {
});

try {
try (DataSourceMeta.JDBCConnection conn = sourceFactory.getConnection(jdbcUrls[0])) {
try (DataSourceMeta.JDBCConnection conn = sourceFactory.getConnection(jdbcUrls[0], false)) {
Statement statement = conn.createStatement();
//+ " where id='" + colIdVal + "'"
ResultSet resultSet = statement.executeQuery("select * from " + jdbcUrls[1] + "." + tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@

package com.qlangtech.plugins.incr.flink.chunjun.dameng.source;

import com.qlangtech.tis.extension.TISExtension;
import com.qlangtech.tis.plugins.incr.flink.chunjun.source.ChunjunSourceFactory;
import com.qlangtech.tis.async.message.client.consumer.IFlinkColCreator;
import com.qlangtech.tis.async.message.client.consumer.IMQListener;
import com.qlangtech.tis.plugin.IEndTypeGetter;
import com.qlangtech.tis.plugins.incr.flink.chunjun.source.ChunjunSourceFactory;

/**
* @author: 百岁([email protected]
Expand All @@ -33,7 +33,12 @@ public IMQListener create() {
return new DamengSourceFunction(this);
}

// @TISExtension
@Override
public <FlinkColType> IFlinkColCreator<FlinkColType> createFlinkColCreator() {
throw new UnsupportedOperationException();
}

// @TISExtension
public static class DftDesc extends BaseChunjunDescriptor {
@Override
public IEndTypeGetter.EndType getEndType() {
Expand Down
Loading

0 comments on commit ab5fab8

Please sign in to comment.