From bf1f1b95c7e171a2941408089d0d9f51132ed0b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=83=E9=94=8B?= Date: Sat, 27 Oct 2018 15:59:11 +0800 Subject: [PATCH] fixed drds support --- .../spring/tsdb/sql-map/sqlmap_history.xml | 4 +- .../spring/tsdb/sql-map/sqlmap_snapshot.xml | 4 +- .../etl/select/selector/MessageParser.java | 48 +++++++++++-------- 3 files changed, 31 insertions(+), 25 deletions(-) diff --git a/node/canal/src/main/resources/spring/tsdb/sql-map/sqlmap_history.xml b/node/canal/src/main/resources/spring/tsdb/sql-map/sqlmap_history.xml index 7445bf69..58957c28 100644 --- a/node/canal/src/main/resources/spring/tsdb/sql-map/sqlmap_history.xml +++ b/node/canal/src/main/resources/spring/tsdb/sql-map/sqlmap_history.xml @@ -36,10 +36,10 @@ - + \ No newline at end of file diff --git a/node/canal/src/main/resources/spring/tsdb/sql-map/sqlmap_snapshot.xml b/node/canal/src/main/resources/spring/tsdb/sql-map/sqlmap_snapshot.xml index 79aa25fe..2ee0f8f4 100644 --- a/node/canal/src/main/resources/spring/tsdb/sql-map/sqlmap_snapshot.xml +++ b/node/canal/src/main/resources/spring/tsdb/sql-map/sqlmap_snapshot.xml @@ -42,10 +42,10 @@ where destination=#destination# - + 0 ]]> \ No newline at end of file diff --git a/node/etl/src/main/java/com/alibaba/otter/node/etl/select/selector/MessageParser.java b/node/etl/src/main/java/com/alibaba/otter/node/etl/select/selector/MessageParser.java index f88ebde4..18b59c25 100644 --- a/node/etl/src/main/java/com/alibaba/otter/node/etl/select/selector/MessageParser.java +++ b/node/etl/src/main/java/com/alibaba/otter/node/etl/select/selector/MessageParser.java @@ -47,6 +47,7 @@ import com.alibaba.otter.shared.common.model.config.data.DataMedia; import com.alibaba.otter.shared.common.model.config.data.DataMedia.ModeValue; import com.alibaba.otter.shared.common.model.config.data.DataMediaPair; +import com.alibaba.otter.shared.common.model.config.data.DataMediaSource; import com.alibaba.otter.shared.common.model.config.data.db.DbMediaSource; import com.alibaba.otter.shared.common.model.config.pipeline.Pipeline; import com.alibaba.otter.shared.common.model.config.pipeline.PipelineParameter; @@ -439,27 +440,32 @@ private EventData internParse(Pipeline pipeline, Entry entry, RowChange rowChang } // 获取一下目标库的拆分字段,设置源表为主键 // 首先要求源和目标的库名表名是一致的 - DbDialect targetDbDialect = dbDialectFactory.getDbDialect(pipeline.getId(), - (DbMediaSource) targetDataMedia.getSource()); - if (targetDbDialect.isDRDS()) { - String schemaName = buildName(eventData.getSchemaName(), - dataMedia.getNamespaceMode(), - targetDataMedia.getNamespaceMode()); - String tableName = buildName(eventData.getSchemaName(), - dataMedia.getNameMode(), - targetDataMedia.getNameMode()); - String shardColumns = targetDbDialect.getShardColumns(schemaName, tableName); - if (StringUtils.isNotEmpty(shardColumns)) { - String columns[] = StringUtils.split(shardColumns, ','); - for (String key : columns) { - org.apache.ddlutils.model.Column col = table.findColumn(key, false); - if (col != null) { - col.setPrimaryKey(true); - } else { - logger.warn(String.format("shardColumn %s in table[%s.%s] is not found", - key, - eventData.getSchemaName(), - eventData.getTableName())); + DataMediaSource targetSource = targetDataMedia.getSource(); + if (targetSource instanceof DbMediaSource + && StringUtils.containsIgnoreCase(((DbMediaSource) targetSource).getUrl(), "drds")) { + // 优先判断是否为drds + DbDialect targetDbDialect = dbDialectFactory.getDbDialect(pipeline.getId(), + (DbMediaSource) targetDataMedia.getSource()); + if (targetDbDialect.isDRDS()) { + String schemaName = buildName(eventData.getSchemaName(), + dataMedia.getNamespaceMode(), + targetDataMedia.getNamespaceMode()); + String tableName = buildName(eventData.getSchemaName(), + dataMedia.getNameMode(), + targetDataMedia.getNameMode()); + String shardColumns = targetDbDialect.getShardColumns(schemaName, tableName); + if (StringUtils.isNotEmpty(shardColumns)) { + String columns[] = StringUtils.split(shardColumns, ','); + for (String key : columns) { + org.apache.ddlutils.model.Column col = table.findColumn(key, false); + if (col != null) { + col.setPrimaryKey(true); + } else { + logger.warn(String.format("shardColumn %s in table[%s.%s] is not found", + key, + eventData.getSchemaName(), + eventData.getTableName())); + } } } }