Skip to content

Commit

Permalink
Merge branch 'apache:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian authored Nov 4, 2022
2 parents c137fab + 451a26e commit 19ad60c
Show file tree
Hide file tree
Showing 53 changed files with 2,736 additions and 296 deletions.
9 changes: 8 additions & 1 deletion .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,11 @@ github:
# Enable wiki
wiki: true

protected_branches:
protected_branches:

notifications:
commits: [email protected]
issues: [email protected]
pullrequests: [email protected]
jobs: [email protected]
discussions: [email protected]
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.remoting.RPCHook;
Expand All @@ -39,6 +41,7 @@
import org.apache.rocketmq.tools.command.CommandUtil;

import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
Expand All @@ -62,10 +65,41 @@ public static String createUniqInstance(String prefix) {
return new StringBuffer(prefix).append("-").append(UUID.randomUUID()).toString();
}

private static RPCHook getAclRPCHook(String accessKey, String secretKey) {
public static RPCHook getAclRPCHook(String accessKey, String secretKey) {
return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
}

/**
* init default lite pull consumer
*
* @param connectConfig
* @param autoCommit
* @return
* @throws MQClientException
*/
public static DefaultLitePullConsumer initDefaultLitePullConsumer(RocketMqConnectConfig connectConfig, boolean autoCommit) throws MQClientException {
DefaultLitePullConsumer consumer = null;
if (Objects.isNull(consumer)) {
if (StringUtils.isBlank(connectConfig.getAccessKey()) && StringUtils.isBlank(connectConfig.getSecretKey())) {
consumer = new DefaultLitePullConsumer(
connectConfig.getRmqConsumerGroup()
);
} else {
consumer = new DefaultLitePullConsumer(
connectConfig.getRmqConsumerGroup(),
getAclRPCHook(connectConfig.getAccessKey(), connectConfig.getSecretKey())
);
}
}
consumer.setNamesrvAddr(connectConfig.getNamesrvAddr());
String uniqueName = Thread.currentThread().getName() + "-" + System.currentTimeMillis() % 1000;
consumer.setInstanceName(uniqueName);
consumer.setUnitName(uniqueName);
consumer.setAutoCommit(autoCommit);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
return consumer;
}

/**
* init default lite pull consumer
*
Expand Down Expand Up @@ -169,6 +203,7 @@ public static boolean topicExist(RocketMqConnectConfig connectConfig, String top
return foundTopicRouteInfo;
}


public static Set<String> fetchAllConsumerGroup(RocketMqConnectConfig connectConfig) {
Set<String> consumerGroupSet = Sets.newHashSet();
DefaultMQAdminExt defaultMQAdminExt = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@

import io.debezium.config.Configuration;

import java.util.Objects;

/**
* rocketmq connect config
*/
public class RocketMqConnectConfig {

private String dbHistoryName;

private String namesrvAddr;

private int operationTimeout = 3000;
Expand All @@ -48,27 +47,29 @@ public class RocketMqConnectConfig {
private String secretKey;


public static Builder newBuilder(){
return new Builder();
}

public RocketMqConnectConfig() {}

public RocketMqConnectConfig(Configuration config, String dbHistoryName) {
this.dbHistoryName = dbHistoryName;
this.rmqConsumerGroup = this.dbHistoryName.concat("-group");
public RocketMqConnectConfig(String rmqConsumerGroup, String namesrvAddr, boolean aclEnable, String accessKey, String secretKey) {
this.rmqConsumerGroup = rmqConsumerGroup;
this.namesrvAddr = namesrvAddr;
this.aclEnable = aclEnable;
this.accessKey = accessKey;
this.secretKey = secretKey;
}

public RocketMqConnectConfig(Configuration config, String prefixGroupName) {
this.rmqConsumerGroup = prefixGroupName.concat("-group");
// init rocketmq connection
this.namesrvAddr = config.getString(RocketMqDatabaseHistory.NAME_SRV_ADDR);
this.aclEnable = config.getBoolean(RocketMqDatabaseHistory.ROCKETMQ_ACL_ENABLE);
this.accessKey = config.getString(RocketMqDatabaseHistory.ROCKETMQ_ACCESS_KEY);
this.secretKey = config.getString(RocketMqDatabaseHistory.ROCKETMQ_SECRET_KEY);
}


public String getDbHistoryName() {
return dbHistoryName;
}

public void setDbHistoryName(String dbHistoryName) {
this.dbHistoryName = dbHistoryName;
}

public String getNamesrvAddr() {
return namesrvAddr;
}
Expand Down Expand Up @@ -149,11 +150,23 @@ public void setSecretKey(String secretKey) {
this.secretKey = secretKey;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
RocketMqConnectConfig that = (RocketMqConnectConfig) o;
return operationTimeout == that.operationTimeout && rmqMaxRedeliveryTimes == that.rmqMaxRedeliveryTimes && rmqMessageConsumeTimeout == that.rmqMessageConsumeTimeout && rmqMaxConsumeThreadNums == that.rmqMaxConsumeThreadNums && rmqMinConsumeThreadNums == that.rmqMinConsumeThreadNums && aclEnable == that.aclEnable && Objects.equals(namesrvAddr, that.namesrvAddr) && Objects.equals(rmqConsumerGroup, that.rmqConsumerGroup) && Objects.equals(accessKey, that.accessKey) && Objects.equals(secretKey, that.secretKey);
}

@Override
public int hashCode() {
return Objects.hash(namesrvAddr, operationTimeout, rmqConsumerGroup, rmqMaxRedeliveryTimes, rmqMessageConsumeTimeout, rmqMaxConsumeThreadNums, rmqMinConsumeThreadNums, aclEnable, accessKey, secretKey);
}

@Override
public String toString() {
return "RocketMqConnectConfig{" +
"dbHistoryName='" + dbHistoryName + '\'' +
", namesrvAddr='" + namesrvAddr + '\'' +
"namesrvAddr='" + namesrvAddr + '\'' +
", operationTimeout=" + operationTimeout +
", rmqConsumerGroup='" + rmqConsumerGroup + '\'' +
", rmqMaxRedeliveryTimes=" + rmqMaxRedeliveryTimes +
Expand All @@ -165,4 +178,40 @@ public String toString() {
", secretKey='" + secretKey + '\'' +
'}';
}

public static class Builder{
private String namesrvAddr;
private String rmqConsumerGroup;
/** set acl config **/
private boolean aclEnable;
private String accessKey;
private String secretKey;

public Builder namesrvAddr(String namesrvAddr){
this.namesrvAddr = namesrvAddr;
return this;
}

public Builder rmqConsumerGroup(String rmqConsumerGroup){
this.rmqConsumerGroup = rmqConsumerGroup;
return this;
}

public Builder aclEnable(boolean aclEnable){
this.aclEnable = aclEnable;
return this;
}

public Builder accessKey(String accessKey){
this.accessKey = accessKey;
return this;
}
public Builder secretKey(String secretKey){
this.secretKey = secretKey;
return this;
}
public RocketMqConnectConfig build(){
return new RocketMqConnectConfig(rmqConsumerGroup, namesrvAddr, aclEnable, accessKey, secretKey);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,24 @@

<dependencies>

<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<version>${debezium.version}</version>
</dependency>

<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<version>${debezium.version}</version>
</dependency>


<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>kafka-connect-adaptor</artifactId>
Expand Down Expand Up @@ -152,24 +164,36 @@
<artifactId>findbugs-maven-plugin</artifactId>
<version>3.0.4</version>
</plugin>

<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>io.debezium:debezium-connector-mysql</artifact>
<excludes>
<exclude>io/debezium/connector/mysql/MySqlReadOnlyIncrementalSnapshotChangeEventSource*.*</exclude>
<exclude>io/debezium/connector/mysql/MySqlReadOnlyIncrementalSnapshotContext*.*</exclude>
</excludes>
</filter>
</filters>
<shadedArtifactAttached>true</shadedArtifactAttached>
<shadedClassifierName>jar-with-dependencies</shadedClassifierName>
</configuration>
</execution>
</executions>
</plugin>



<plugin>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>2.17</version>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# Incremental Snapshots

## 1.参考
- DBLog
> https://arxiv.org/pdf/2010.12597v1.pdf
- Incremental Snapshots
> https://debezium.io/blog/2021/10/07/incremental-snapshots/
- Read-only Incremental Snapshots
> https://debezium.io/blog/2022/04/07/read-only-incremental-snapshots/

## 问题
当前debezium实现的默认全量快照功能,在全量过程中会出现锁表的现象,可能会导致以下问题
>1. 导致在使用过程中无法对表进行写入,这样快照时间只能避开用户使用期间,无法随时随地对表进行快照
>2. 数据量大了可能造成锁表时间超时,快照失败
>3. 全量快照未保留上次快照的位点,一旦失败会造成重新全量快照,无法增量;
## Read-only Incremental Snapshots(mysql)

### 先决条件

此方案只适用于Mysql , 并依赖全局事务标志GTID,因此,如果你从只读副本读取,则需要设置 gtid_mode=on , 以保留GTID的顺序
```
gtid_mode = ON
enforce_gtid_consistency = ON
if replica_parallel_workers > 0 set replica_preserve_commit_order = ON
```
该算法运行SHOW MASTER STATUS查询以获取在块选择之前和之后设置的已执行 GTID:
```
low watermark = executed_gtid_set
high watermark = executed_gtid_set - low watermark
```

## 2.任务设置
```
{
"connector.class":"org.apache.rocketmq.connect.debezium.mysql.DebeziumMysqlConnector",
"max.tasks":"1",
"connect.topicname":"debezium-mysql-source-0002",
"kafka.transforms": "Unwrap",
"kafka.transforms.Unwrap.delete.handling.mode": "none",
"kafka.transforms.Unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"kafka.transforms.Unwrap.add.headers": "op,source.db,source.table",
"database.history.name.srv.addr": "localhost:9876",
"database.history.rocketmq.topic": "db-history-debezium-topic-0002",
"database.history.store.only.monitored.tables.ddl": true,
"database.serverTimezone":"UTC",
"database.user": "bizworks",
"include.schema.changes": false,
"database.server.name": "server-0002",
"database.port": 3306,
"database.hostname": "localhost",
"database.password": "******",
"table.include.list": "test_database.employee_copy3",
"max.batch.size": 5,
"database.include.list": "test_database",
"snapshot.mode": "schema_only",
"read.only": "true",
"incremental.snapshot.allow.schema.changes": "true",
"incremental.snapshot.chunk.size": "5000",
"signal.rocketmq.topic": "dbz-signals-001",
"signal.name.srv.addr": "localhost:9876",
"key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
"value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"
}
```
### 参数说明
- "read.only": "true",
> true 为开启 readonly 的增量快照模式
- snapshot.mode
> 增量快照下配置成schema_only,其它方式暂未测试,never会有空指针错误,后面修复
- incremental.snapshot.allow.schema.changes
> 快照过程中是否允许schema 发生变化
- signal.rocketmq.topic
> 指定监听的信令的topic
- signal.name.srv.addr
> rocketmq name server addr
### 信令发送

```
sh mqadmin sendMessage -t dbz-signals-001 -n localhost:9876 -k server-0002 -p {"type":"execute-snapshot","data": {"data-collections": ["test_database.employee_copy3"], "type": "INCREMENTAL"}}
```
#### 信令发送配置

- rocketmq message key
> rocketmq 发送的消息key是 [database.server.name] 配置项
- rocketmq message value :
> 1. type: 固定值 "execute-snapshot"
> 2. data.data-collections :为要监听的表的集合
> 3. data.type :固定类型 "INCREMENTAL"
>
```
{
"type":"execute-snapshot",
"data":{
"data-collections":[
"test_database.employee_copy3"
],
"type":"INCREMENTAL"
}
}
```
Loading

0 comments on commit 19ad60c

Please sign in to comment.