diff --git a/node/canal/src/main/resources/spring/tsdb/sql-map/sqlmap_history.xml b/node/canal/src/main/resources/spring/tsdb/sql-map/sqlmap_history.xml
index 7445bf69..58957c28 100644
--- a/node/canal/src/main/resources/spring/tsdb/sql-map/sqlmap_history.xml
+++ b/node/canal/src/main/resources/spring/tsdb/sql-map/sqlmap_history.xml
@@ -36,10 +36,10 @@
-
+
\ No newline at end of file
diff --git a/node/canal/src/main/resources/spring/tsdb/sql-map/sqlmap_snapshot.xml b/node/canal/src/main/resources/spring/tsdb/sql-map/sqlmap_snapshot.xml
index 79aa25fe..2ee0f8f4 100644
--- a/node/canal/src/main/resources/spring/tsdb/sql-map/sqlmap_snapshot.xml
+++ b/node/canal/src/main/resources/spring/tsdb/sql-map/sqlmap_snapshot.xml
@@ -42,10 +42,10 @@
where destination=#destination#
-
+
0
]]>
\ No newline at end of file
diff --git a/node/etl/src/main/java/com/alibaba/otter/node/etl/select/selector/MessageParser.java b/node/etl/src/main/java/com/alibaba/otter/node/etl/select/selector/MessageParser.java
index f88ebde4..18b59c25 100644
--- a/node/etl/src/main/java/com/alibaba/otter/node/etl/select/selector/MessageParser.java
+++ b/node/etl/src/main/java/com/alibaba/otter/node/etl/select/selector/MessageParser.java
@@ -47,6 +47,7 @@
import com.alibaba.otter.shared.common.model.config.data.DataMedia;
import com.alibaba.otter.shared.common.model.config.data.DataMedia.ModeValue;
import com.alibaba.otter.shared.common.model.config.data.DataMediaPair;
+import com.alibaba.otter.shared.common.model.config.data.DataMediaSource;
import com.alibaba.otter.shared.common.model.config.data.db.DbMediaSource;
import com.alibaba.otter.shared.common.model.config.pipeline.Pipeline;
import com.alibaba.otter.shared.common.model.config.pipeline.PipelineParameter;
@@ -439,27 +440,32 @@ private EventData internParse(Pipeline pipeline, Entry entry, RowChange rowChang
}
// 获取一下目标库的拆分字段,设置源表为主键
// 首先要求源和目标的库名表名是一致的
- DbDialect targetDbDialect = dbDialectFactory.getDbDialect(pipeline.getId(),
- (DbMediaSource) targetDataMedia.getSource());
- if (targetDbDialect.isDRDS()) {
- String schemaName = buildName(eventData.getSchemaName(),
- dataMedia.getNamespaceMode(),
- targetDataMedia.getNamespaceMode());
- String tableName = buildName(eventData.getSchemaName(),
- dataMedia.getNameMode(),
- targetDataMedia.getNameMode());
- String shardColumns = targetDbDialect.getShardColumns(schemaName, tableName);
- if (StringUtils.isNotEmpty(shardColumns)) {
- String columns[] = StringUtils.split(shardColumns, ',');
- for (String key : columns) {
- org.apache.ddlutils.model.Column col = table.findColumn(key, false);
- if (col != null) {
- col.setPrimaryKey(true);
- } else {
- logger.warn(String.format("shardColumn %s in table[%s.%s] is not found",
- key,
- eventData.getSchemaName(),
- eventData.getTableName()));
+ DataMediaSource targetSource = targetDataMedia.getSource();
+ if (targetSource instanceof DbMediaSource
+ && StringUtils.containsIgnoreCase(((DbMediaSource) targetSource).getUrl(), "drds")) {
+ // 优先判断是否为drds
+ DbDialect targetDbDialect = dbDialectFactory.getDbDialect(pipeline.getId(),
+ (DbMediaSource) targetDataMedia.getSource());
+ if (targetDbDialect.isDRDS()) {
+ String schemaName = buildName(eventData.getSchemaName(),
+ dataMedia.getNamespaceMode(),
+ targetDataMedia.getNamespaceMode());
+ String tableName = buildName(eventData.getSchemaName(),
+ dataMedia.getNameMode(),
+ targetDataMedia.getNameMode());
+ String shardColumns = targetDbDialect.getShardColumns(schemaName, tableName);
+ if (StringUtils.isNotEmpty(shardColumns)) {
+ String columns[] = StringUtils.split(shardColumns, ',');
+ for (String key : columns) {
+ org.apache.ddlutils.model.Column col = table.findColumn(key, false);
+ if (col != null) {
+ col.setPrimaryKey(true);
+ } else {
+ logger.warn(String.format("shardColumn %s in table[%s.%s] is not found",
+ key,
+ eventData.getSchemaName(),
+ eventData.getTableName()));
+ }
}
}
}