Skip to content

Commit

Permalink
fixed drds support
Browse files Browse the repository at this point in the history
  • Loading branch information
agapple committed Oct 27, 2018
1 parent 0660911 commit bf1f1b9
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@
</delete>


<delete id="deleteByGmtModified" parameterClass="java.util.Map">
<delete id="deleteByTimestamp" parameterClass="java.util.Map">
<![CDATA[
delete from meta_history
where gmt_modified < timestamp(#timestamp#)
where destination=#destination# and binlog_timestamp < #timestamp#
]]>
</delete>
</sqlMap>
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@
where destination=#destination#
</delete>

<delete id="deleteByGmtModified" parameterClass="java.util.Map">
<delete id="deleteByTimestamp" parameterClass="java.util.Map">
<![CDATA[
delete from meta_snapshot
where gmt_modified < timestamp(#timestamp#)
where destination=#destination# and binlog_timestamp < #timestamp# and binlog_timestamp > 0
]]>
</delete>
</sqlMap>
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import com.alibaba.otter.shared.common.model.config.data.DataMedia;
import com.alibaba.otter.shared.common.model.config.data.DataMedia.ModeValue;
import com.alibaba.otter.shared.common.model.config.data.DataMediaPair;
import com.alibaba.otter.shared.common.model.config.data.DataMediaSource;
import com.alibaba.otter.shared.common.model.config.data.db.DbMediaSource;
import com.alibaba.otter.shared.common.model.config.pipeline.Pipeline;
import com.alibaba.otter.shared.common.model.config.pipeline.PipelineParameter;
Expand Down Expand Up @@ -439,27 +440,32 @@ private EventData internParse(Pipeline pipeline, Entry entry, RowChange rowChang
}
// 获取一下目标库的拆分字段,设置源表为主键
// 首先要求源和目标的库名表名是一致的
DbDialect targetDbDialect = dbDialectFactory.getDbDialect(pipeline.getId(),
(DbMediaSource) targetDataMedia.getSource());
if (targetDbDialect.isDRDS()) {
String schemaName = buildName(eventData.getSchemaName(),
dataMedia.getNamespaceMode(),
targetDataMedia.getNamespaceMode());
String tableName = buildName(eventData.getSchemaName(),
dataMedia.getNameMode(),
targetDataMedia.getNameMode());
String shardColumns = targetDbDialect.getShardColumns(schemaName, tableName);
if (StringUtils.isNotEmpty(shardColumns)) {
String columns[] = StringUtils.split(shardColumns, ',');
for (String key : columns) {
org.apache.ddlutils.model.Column col = table.findColumn(key, false);
if (col != null) {
col.setPrimaryKey(true);
} else {
logger.warn(String.format("shardColumn %s in table[%s.%s] is not found",
key,
eventData.getSchemaName(),
eventData.getTableName()));
DataMediaSource targetSource = targetDataMedia.getSource();
if (targetSource instanceof DbMediaSource
&& StringUtils.containsIgnoreCase(((DbMediaSource) targetSource).getUrl(), "drds")) {
// 优先判断是否为drds
DbDialect targetDbDialect = dbDialectFactory.getDbDialect(pipeline.getId(),
(DbMediaSource) targetDataMedia.getSource());
if (targetDbDialect.isDRDS()) {
String schemaName = buildName(eventData.getSchemaName(),
dataMedia.getNamespaceMode(),
targetDataMedia.getNamespaceMode());
String tableName = buildName(eventData.getSchemaName(),
dataMedia.getNameMode(),
targetDataMedia.getNameMode());
String shardColumns = targetDbDialect.getShardColumns(schemaName, tableName);
if (StringUtils.isNotEmpty(shardColumns)) {
String columns[] = StringUtils.split(shardColumns, ',');
for (String key : columns) {
org.apache.ddlutils.model.Column col = table.findColumn(key, false);
if (col != null) {
col.setPrimaryKey(true);
} else {
logger.warn(String.format("shardColumn %s in table[%s.%s] is not found",
key,
eventData.getSchemaName(),
eventData.getTableName()));
}
}
}
}
Expand Down

0 comments on commit bf1f1b9

Please sign in to comment.