Skip to content

Commit

Permalink
[fixed issue alibaba#694 otter 同步 RDS 到 DRDS 出现拆分键无法更新问题]
Browse files Browse the repository at this point in the history
  • Loading branch information
奕铭 committed Jan 2, 2019
1 parent 322b4de commit ee34a3d
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ public String getSelectSql(String schemaName, String tableName, String[] pkNames
return sql.toString().intern();// 不使用intern,避免方法区内存消耗过多
}

public String getUpdateSql(String schemaName, String tableName, String[] pkNames, String[] columnNames) {
public String getUpdateSql(String schemaName, String tableName, String[] pkNames, String[] columnNames, boolean updatePks, String shardColumn) {
StringBuilder sql = new StringBuilder("update " + getFullName(schemaName, tableName) + " set ");
appendColumnEquals(sql, columnNames, ",");
appendExcludeSingleShardColumnEquals(sql, columnNames, ",", updatePks, shardColumn);
sql.append(" where (");
appendColumnEquals(sql, pkNames, "and");
sql.append(")");
Expand Down Expand Up @@ -102,4 +102,25 @@ protected void appendColumnEquals(StringBuilder sql, String[] columns, String se
}
}
}

/**
* 针对DRDS改造, 在 update set 集合中, 排除 单个拆分键 的赋值操作
* @param sql
* @param columns
* @param separator
* @param excludeShardColumn 需要排除的 拆分列
*/
protected void appendExcludeSingleShardColumnEquals(StringBuilder sql, String[] columns, String separator, boolean updatePks, String excludeShardColumn) {
int size = columns.length;
for (int i = 0; i < size; i++) {
// 如果是DRDS数据库, 并且存在拆分键 且 等于当前循环列, 跳过
if(!updatePks && excludeShardColumn != null && columns[i].equals(excludeShardColumn)){
continue;
}
sql.append(" ").append(appendEscape(columns[i])).append(" = ").append("? ");
if (i != size - 1) {
sql.append(separator);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public interface SqlTemplate {

public String getSelectSql(String schemaName, String tableName, String[] pkNames, String[] columnNames);

public String getUpdateSql(String schemaName, String tableName, String[] pkNames, String[] columnNames);
public String getUpdateSql(String schemaName, String tableName, String[] pkNames, String[] columnNames, boolean updatePks, String shardColumn);

public String getDeleteSql(String schemaName, String tableName, String[] pkNames);

Expand All @@ -36,5 +36,5 @@ public interface SqlTemplate {
* 获取对应的mergeSql
*/
public String getMergeSql(String schemaName, String tableName, String[] pkNames, String[] columnNames,
String[] viewColumnNames, boolean updatePks);
String[] viewColumnNames, boolean updatePks, String shardColumn);
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class MysqlSqlTemplate extends AbstractSqlTemplate {
private static final String ESCAPE = "`";

public String getMergeSql(String schemaName, String tableName, String[] pkNames, String[] columnNames,
String[] viewColumnNames, boolean includePks) {
String[] viewColumnNames, boolean includePks, String shardColumn) {
StringBuilder sql = new StringBuilder("insert into " + getFullName(schemaName, tableName) + "(");
int size = columnNames.length;
for (int i = 0; i < size; i++) {
Expand All @@ -54,6 +54,11 @@ public String getMergeSql(String schemaName, String tableName, String[] pkNames,

size = columnNames.length;
for (int i = 0; i < size; i++) {
// 如果是DRDS数据库, 并且存在拆分键 且 等于当前循环列, 跳过
if(!includePks && shardColumn != null && columnNames[i].equals(shardColumn)){
continue;
}

sql.append(appendEscape(columnNames[i]))
.append("=values(")
.append(appendEscape(columnNames[i]))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class OracleSqlTemplate extends AbstractSqlTemplate {
* http://en.wikipedia.org/wiki/Merge_(SQL)
*/
public String getMergeSql(String schemaName, String tableName, String[] keyNames, String[] columnNames,
String[] viewColumnNames, boolean includePks) {
String[] viewColumnNames, boolean includePks, String shardColumn) {
final String aliasA = "a";
final String aliasB = "b";
StringBuilder sql = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@

package com.alibaba.otter.node.etl.load.loader.db.interceptor.sql;

import java.util.List;

import org.springframework.util.CollectionUtils;

import com.alibaba.otter.node.etl.common.db.dialect.DbDialect;
import com.alibaba.otter.node.etl.common.db.dialect.DbDialectFactory;
import com.alibaba.otter.node.etl.common.db.dialect.SqlTemplate;
Expand All @@ -30,6 +26,9 @@
import com.alibaba.otter.shared.etl.model.EventColumn;
import com.alibaba.otter.shared.etl.model.EventData;
import com.alibaba.otter.shared.etl.model.EventType;
import org.springframework.util.CollectionUtils;

import java.util.List;

/**
* 计算下最新的sql语句
Expand All @@ -50,6 +49,17 @@ public boolean before(DbLoadContext context, EventData currentData) {
String sql = null;

String schemaName = (currentData.isWithoutSchema() ? null : currentData.getSchemaName());

/**
* 针对DRDS数据库
*/
String shardColumns = null;
if(dbDialect.isDRDS()){
// 获取拆分键
shardColumns = dbDialect.getShardColumns(schemaName, currentData.getTableName());

}

// 注意insert/update语句对应的字段数序都是将主键排在后面
if (type.isInsert()) {
if (CollectionUtils.isEmpty(currentData.getColumns())
Expand All @@ -65,7 +75,8 @@ public boolean before(DbLoadContext context, EventData currentData) {
buildColumnNames(currentData.getKeys()),
buildColumnNames(currentData.getColumns()),
new String[] {},
!dbDialect.isDRDS());
!dbDialect.isDRDS(),
shardColumns);
}
} else if (type.isUpdate()) {
// String[] keyColumns = buildColumnNames(currentData.getKeys());
Expand Down Expand Up @@ -105,9 +116,10 @@ public boolean before(DbLoadContext context, EventData currentData) {
keyColumns,
otherColumns,
new String[] {},
!dbDialect.isDRDS());
!dbDialect.isDRDS(),
shardColumns);
} else {// 否则进行update sql
sql = sqlTemplate.getUpdateSql(schemaName, currentData.getTableName(), keyColumns, otherColumns);
sql = sqlTemplate.getUpdateSql(schemaName, currentData.getTableName(), keyColumns, otherColumns, !dbDialect.isDRDS(), shardColumns);
}
} else if (type.isDelete()) {
sql = sqlTemplate.getDeleteSql(schemaName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@

package com.alibaba.otter.node.etl.common.db;

import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Types;
import java.util.ArrayList;
import java.util.List;

import com.alibaba.otter.node.etl.BaseDbTest;
import com.alibaba.otter.node.etl.common.db.dialect.DbDialect;
import com.alibaba.otter.node.etl.common.db.dialect.DbDialectFactory;
import com.alibaba.otter.node.etl.common.db.dialect.SqlTemplate;
import com.alibaba.otter.node.etl.common.db.dialect.mysql.MysqlDialect;
import com.alibaba.otter.node.etl.common.db.dialect.oracle.OracleDialect;
import com.alibaba.otter.node.etl.common.db.utils.SqlUtils;
import com.alibaba.otter.shared.common.model.config.data.db.DbDataMedia;
import org.jtester.annotations.SpringBeanByName;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
Expand All @@ -33,14 +35,11 @@
import org.springframework.transaction.support.TransactionTemplate;
import org.testng.annotations.Test;

import com.alibaba.otter.node.etl.BaseDbTest;
import com.alibaba.otter.node.etl.common.db.dialect.DbDialect;
import com.alibaba.otter.node.etl.common.db.dialect.DbDialectFactory;
import com.alibaba.otter.node.etl.common.db.dialect.SqlTemplate;
import com.alibaba.otter.node.etl.common.db.dialect.mysql.MysqlDialect;
import com.alibaba.otter.node.etl.common.db.dialect.oracle.OracleDialect;
import com.alibaba.otter.node.etl.common.db.utils.SqlUtils;
import com.alibaba.otter.shared.common.model.config.data.db.DbDataMedia;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Types;
import java.util.ArrayList;
import java.util.List;

public class DbDialectTest extends BaseDbTest {

Expand Down Expand Up @@ -96,7 +95,7 @@ public Object doInPreparedStatement(PreparedStatement ps) throws SQLException, D
});
want.number(affect).isEqualTo(1);
// 执行update
sql = sqlTemplate.getUpdateSql(MYSQL_SCHEMA_NAME, TABLE_NAME, pkColumns, columns);
sql = sqlTemplate.getUpdateSql(MYSQL_SCHEMA_NAME, TABLE_NAME, pkColumns, columns, true, null);
System.out.println(sql);
affect = (Integer) jdbcTemplate.execute(sql, new PreparedStatementCallback() {

Expand All @@ -123,7 +122,7 @@ public Object doInPreparedStatement(PreparedStatement ps) throws SQLException, D
});
want.number(affect).isEqualTo(1);
// 执行merge
sql = sqlTemplate.getMergeSql(MYSQL_SCHEMA_NAME, TABLE_NAME, pkColumns, columns, null, true);
sql = sqlTemplate.getMergeSql(MYSQL_SCHEMA_NAME, TABLE_NAME, pkColumns, columns, null, true, null);
System.out.println(sql);
affect = (Integer) jdbcTemplate.execute(sql, new PreparedStatementCallback() {

Expand Down Expand Up @@ -176,7 +175,7 @@ public Object doInPreparedStatement(PreparedStatement ps) throws SQLException, D
});
want.number(affect).isEqualTo(1);
// 执行update
sql = sqlTemplate.getUpdateSql(ORACLE_SCHEMA_NAME, TABLE_NAME, pkColumns, columns);
sql = sqlTemplate.getUpdateSql(ORACLE_SCHEMA_NAME, TABLE_NAME, pkColumns, columns, true, null);
System.out.println(sql);
affect = (Integer) jdbcTemplate.execute(sql, new PreparedStatementCallback() {

Expand All @@ -203,7 +202,7 @@ public Object doInPreparedStatement(PreparedStatement ps) throws SQLException, D
});
want.number(affect).isEqualTo(1);
// 执行merge
sql = sqlTemplate.getMergeSql(ORACLE_SCHEMA_NAME, TABLE_NAME, pkColumns, columns, null, true);
sql = sqlTemplate.getMergeSql(ORACLE_SCHEMA_NAME, TABLE_NAME, pkColumns, columns, null, true, null);
System.out.println(sql);

affect = (Integer) jdbcTemplate.execute(sql, new PreparedStatementCallback() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,16 @@ public void test_mysql() {
String sql2 = sqlTemplate.getInsertSql(SCHEMA_NAME, TABLE_NAME, pkColumns, columns);
want.bool(sql1 == sql2);
// 执行update
sql1 = sqlTemplate.getUpdateSql(SCHEMA_NAME, TABLE_NAME, pkColumns, columns);
sql2 = sqlTemplate.getUpdateSql(SCHEMA_NAME, TABLE_NAME, pkColumns, columns);
sql1 = sqlTemplate.getUpdateSql(SCHEMA_NAME, TABLE_NAME, pkColumns, columns, true, null);
sql2 = sqlTemplate.getUpdateSql(SCHEMA_NAME, TABLE_NAME, pkColumns, columns, true, null);
want.bool(sql1 == sql2);
// 执行deleate
sql1 = sqlTemplate.getDeleteSql(SCHEMA_NAME, TABLE_NAME, pkColumns);
sql2 = sqlTemplate.getDeleteSql(SCHEMA_NAME, TABLE_NAME, pkColumns);
want.bool(sql1 == sql2);
// 执行merge
sql1 = sqlTemplate.getMergeSql(SCHEMA_NAME, TABLE_NAME, pkColumns, columns, null, true);
sql2 = sqlTemplate.getMergeSql(SCHEMA_NAME, TABLE_NAME, pkColumns, columns, null, true);
sql1 = sqlTemplate.getMergeSql(SCHEMA_NAME, TABLE_NAME, pkColumns, columns, null, true, null);
sql2 = sqlTemplate.getMergeSql(SCHEMA_NAME, TABLE_NAME, pkColumns, columns, null, true, null);
want.bool(sql1 == sql2);

}
Expand All @@ -62,16 +62,16 @@ public void test_oracle() {
String sql2 = sqlTemplate.getInsertSql(SCHEMA_NAME, TABLE_NAME, pkColumns, columns);
want.bool(sql1 == sql2);
// 执行update
sql1 = sqlTemplate.getUpdateSql(SCHEMA_NAME, TABLE_NAME, pkColumns, columns);
sql2 = sqlTemplate.getUpdateSql(SCHEMA_NAME, TABLE_NAME, pkColumns, columns);
sql1 = sqlTemplate.getUpdateSql(SCHEMA_NAME, TABLE_NAME, pkColumns, columns, true, null);
sql2 = sqlTemplate.getUpdateSql(SCHEMA_NAME, TABLE_NAME, pkColumns, columns, true, null);
want.bool(sql1 == sql2);
// 执行deleate
sql1 = sqlTemplate.getDeleteSql(SCHEMA_NAME, TABLE_NAME, pkColumns);
sql2 = sqlTemplate.getDeleteSql(SCHEMA_NAME, TABLE_NAME, pkColumns);
want.bool(sql1 == sql2);
// 执行merge
sql1 = sqlTemplate.getMergeSql(SCHEMA_NAME, TABLE_NAME, pkColumns, columns, null, true);
sql2 = sqlTemplate.getMergeSql(SCHEMA_NAME, TABLE_NAME, pkColumns, columns, null, true);
sql1 = sqlTemplate.getMergeSql(SCHEMA_NAME, TABLE_NAME, pkColumns, columns, null, true, null);
sql2 = sqlTemplate.getMergeSql(SCHEMA_NAME, TABLE_NAME, pkColumns, columns, null, true, null);
want.bool(sql1 == sql2);
}

Expand Down

0 comments on commit ee34a3d

Please sign in to comment.