Skip to content

Commit

Permalink
add date type for sqlserver DDL builder
Browse files Browse the repository at this point in the history
  • Loading branch information
baisui1981 committed Nov 1, 2024
1 parent b167a02 commit ddd052d
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

/**
* @author: baisui 百岁
Expand Down Expand Up @@ -67,15 +68,20 @@ public CreateTableSqlBuilder.CreateDDL generateCreateDDL(
// if (!this.autoCreateTable) {
// return null;
// }

// https://www.cnblogs.com/mingfei200169/articles/427591.html
final CreateTableSqlBuilder createTableSqlBuilder
= new CreateTableSqlBuilder<ColWrapper>(tableMapper, this.getDataSourceFactory(), transformers) {

private boolean isMulitPks() {
return this.pks.size() > 1;
}

private String convertType(DataType type, boolean isPk) {
//https://www.cnblogs.com/liberty777/p/10748570.html
StringBuffer createSql = new StringBuffer(getSqlServerType(type));

if (isPk) {
if (!this.isMulitPks() && isPk) {
createSql.append(" primary key ");
}
return createSql.toString();
Expand Down Expand Up @@ -105,6 +111,7 @@ private String getSqlServerType(DataType type) {
case DECIMAL:
return "decimal(" + type.getColumnSize() + ", " + type.getDecimalDigits() + ")";
case DATE:
return "date";
case TIME:
case TIMESTAMP:
return "datetime";
Expand All @@ -129,7 +136,40 @@ private String getSqlServerType(DataType type) {

@Override
protected void appendExtraColDef(List<String> pk) {

if (this.isMulitPks()) {
/**
* 建表语句中不能有超过一个列的修饰符为 “primary key”
* <pre>
* CREATE TABLE "base"
* (
* "base_id" int primary key ,
* "start_time" datetime,
* "update_date" datetime primary key ,
* "update_time" datetime,
* "price" decimal(5, 2),
* "json_content" varchar(2000),
* "col_blob" varbinary(8000),
* "col_text" text
* )
* </pre>
* 应该改为:
* <pre>
* CREATE TABLE "base"
* (
* "base_id" int ,
* "start_time" datetime,
* "update_date" datetime ,
* "update_time" datetime,
* "price" decimal(5, 2),
* "json_content" varchar(2000),
* "col_blob" varbinary(8000),
* "col_text" text
* ,PRIMARY KEY ( "base_id" , "update_date")
* )
* </pre>
*/
script.appendLine(",PRIMARY KEY ( " + pk.stream().map((key) -> this.dsMeta.getEscapedEntity(key)).collect(Collectors.joining(",")) + " )");
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public abstract class SqlServerDatasourceFactory extends BasicDataSourceFactory

@Override
public String buidJdbcUrl(DBConfig db, String ip, String dbName) {
String jdbcUrl = "jdbc:sqlserver://" + ip + ":" + this.port + ";databaseName=" + dbName;// + ";user=" + this.userName + ";password=" + password;
String jdbcUrl = "jdbc:sqlserver://" + ip + ":" + this.port + ";databaseName=" + dbName;
if (StringUtils.isNotEmpty(this.extraParams)) {
jdbcUrl = jdbcUrl + ";" + this.extraParams;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,11 @@ public void testGenerateCreateDDL() {

DataXSqlserverWriter writer = getDataXSqlserverWriter();
Optional<IDataxProcessor.TableMap> tableMap = TestSelectedTabs.createTableMapper();
CreateTableSqlBuilder.CreateDDL createDDL = writer.generateCreateDDL(tableMap.get());
CreateTableSqlBuilder.CreateDDL createDDL = writer.generateCreateDDL(tableMap.get(),Optional.empty());
assertNull(createDDL);

writer.autoCreateTable = true;
createDDL = writer.generateCreateDDL(tableMap.get());
createDDL = writer.generateCreateDDL(tableMap.get(),Optional.empty());
assertNotNull(createDDL);

assertEquals("CREATE TABLE orderinfo_new\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,11 @@ public RowDataSinkFunc createRowDataSinkFunc(IDataxProcessor dataxProcessor
// , sourceColsMeta
, AbstractRowDataMapper.getAllTabColsMeta(Objects.requireNonNull(sinkFunc.tableCols, "tabCols can not be null").getCols())
, supportUpsetDML()
, this.parallelism, RowDataSinkFunc.createTransformerRules(dataxProcessor.identityValue(), tabName, tab, sourceFlinkColCreator));
, this.parallelism
, RowDataSinkFunc.createTransformerRules(dataxProcessor.identityValue()
, tabName
, tab
, Objects.requireNonNull(sourceFlinkColCreator, "sourceFlinkColCreator can not be null")));
}


Expand Down Expand Up @@ -318,7 +322,7 @@ protected final SyncConf createSyncConf(SelectedTab tab, String targetTabName, S
col.put("type", parseType(cm));
cols.add(col);
}
params.put(ConfigConstant.KEY_COLUMN, cols);
params.put(ConfigConstant.KEY_COLUMN, cols);
params.put(KEY_FULL_COLS, colMetasMap.getCols().stream().map((c) -> c.getName()).collect(Collectors.toList()));
// params.put(KEY_FULL_COLS, tab.getCols().stream().map((c) -> c.getName()).collect(Collectors.toList()));
params.put("batchSize", this.batchSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.qlangtech.plugins.incr.flink.cdc.FlinkCol;
import com.qlangtech.plugins.incr.flink.cdc.IResultRows;
import com.qlangtech.plugins.incr.flink.junit.TISApplySkipFlinkClassloaderFactoryCreation;
import com.qlangtech.tis.async.message.client.consumer.IFlinkColCreator;
import com.qlangtech.tis.async.message.client.consumer.IMQListener;
import com.qlangtech.tis.async.message.client.consumer.Tab2OutputTag;
import com.qlangtech.tis.async.message.client.consumer.impl.MQListenerFactory;
import com.qlangtech.tis.datax.DataXCfgFile;
import com.qlangtech.tis.datax.IDataxProcessor;
import com.qlangtech.tis.datax.IDataxReader;
Expand All @@ -45,6 +48,7 @@
import com.qlangtech.tis.plugin.ds.ISelectedTab;
import com.qlangtech.tis.plugin.ds.JDBCTypes;
import com.qlangtech.tis.plugin.incr.TISSinkFactory;
import com.qlangtech.tis.plugins.incr.flink.cdc.AbstractRowDataMapper;
import com.qlangtech.tis.plugins.incr.flink.chunjun.sink.SinkTabPropsExtends;
import com.qlangtech.tis.plugins.incr.flink.connector.ChunjunSinkFactory;
import com.qlangtech.tis.plugins.incr.flink.connector.UpdateMode;
Expand All @@ -54,6 +58,7 @@
import com.qlangtech.tis.realtime.dto.DTOStream;
import com.qlangtech.tis.realtime.transfer.DTO;
import com.qlangtech.tis.test.TISEasyMock;
import com.qlangtech.tis.util.HeteroEnum;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand Down Expand Up @@ -98,7 +103,7 @@ public abstract class TestFlinkSinkExecutor extends AbstractTestBase implements
protected static final String dbName = "tis";

String colEntityId = "entity_id";
String colNum = "num";
protected String colNum = "num";
static protected String colId = "id";
String colCreateTime = "create_time";
protected String updateTime = "update_time";
Expand Down Expand Up @@ -155,7 +160,7 @@ public void test() throws Exception {

}

private int updateNumVal = 999;
protected int updateNumVal = 999;

protected DTO[] createTestDTO() {
return createTestDTO(true);
Expand Down Expand Up @@ -192,8 +197,10 @@ protected void testSinkSync() throws Exception {

IFlinkColCreator flinkColCreator = null;
Map<TableAlias, TabSinkFunc<RowData>> sinkFunction = sinkFactory.createSinkFunction(dataxProcessor, flinkColCreator);
//int updateNumVal = 999;
Assert.assertEquals(1, sinkFunction.size());
this.startTestSinkSync(sinkFunction);


for (Map.Entry<TableAlias, TabSinkFunc<RowData>> entry : sinkFunction.entrySet()) {

Pair<DTOStream, ReaderSource<DTO>> sourceStream = createReaderSource(env, entry.getKey());
Expand All @@ -212,6 +219,9 @@ protected void testSinkSync() throws Exception {
});
}

protected void startTestSinkSync(Map<TableAlias, TabSinkFunc<RowData>> sinkFunction) {
}

// @Test
protected void testSinkSync(IStreamScriptRun streamScriptRun) throws Exception {

Expand Down Expand Up @@ -248,6 +258,7 @@ DISTRIBUTED BY HASH(`id`) BUCKETS 10
mapper.put(tableName, new TableAlias(tableName));
TableAliasMapper aliasMapper = new TableAliasMapper(mapper);
EasyMock.expect(dataxProcessor.getTabAlias(null)).andReturn(aliasMapper);
EasyMock.expect(dataxProcessor.identityValue()).andReturn(dataXName).anyTimes();

File ddlDir = folder.newFolder("ddl");
String tabSql = tableName + DataXCfgFile.DATAX_CREATE_DDL_FILE_NAME_SUFFIX;
Expand All @@ -258,6 +269,21 @@ DISTRIBUTED BY HASH(`id`) BUCKETS 10
DataxProcessor.processorGetter = (name) -> {
return dataxProcessor;
};
HeteroEnum.incrSourceListenerFactoryStub = (dataX) -> {

MQListenerFactory mockIncrSourceFactory = new MQListenerFactory() {
@Override
public IFlinkColCreator<FlinkCol> createFlinkColCreator() {
return AbstractRowDataMapper::mapFlinkCol;
}

@Override
public IMQListener create() {
throw new UnsupportedOperationException();
}
};
return mockIncrSourceFactory;
};
IDataxReader dataxReader = createDataxReader();
List<ISelectedTab> selectedTabs = Lists.newArrayList();

Expand Down Expand Up @@ -482,7 +508,7 @@ protected UpdateMode createIncrMode() {

protected abstract DataxWriter createDataXWriter();

private DTO createDTO(DTO.EventType eventType, Consumer<Map<String, Object>>... consumer) {
protected DTO createDTO(DTO.EventType eventType, Consumer<Map<String, Object>>... consumer) {
DTO d = new DTO();
d.setEventType(eventType);
d.setTableName(tableName);
Expand Down

0 comments on commit ddd052d

Please sign in to comment.