Skip to content

Commit

Permalink
support hologres connector
Browse files Browse the repository at this point in the history
  • Loading branch information
ferrirW committed Jun 7, 2023
1 parent 85d9c03 commit 44b80d8
Show file tree
Hide file tree
Showing 16 changed files with 1,051 additions and 7 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ Environment variables配置

> CONNECT_HOME=${user path}/rocketmq-connect/distribution
###集群模式启动Connect Worker
### 集群模式启动Connect Worker

Main Class配置
>org.apache.rocketmq.connect.runtime.DistributedConnectStartup
Expand Down
38 changes: 38 additions & 0 deletions connectors/rocketmq-connect-hologres/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/

### IntelliJ IDEA ###
.idea/modules.xml
.idea/jarRepositories.xml
.idea/compiler.xml
.idea/libraries/
*.iws
*.iml
*.ipr

### Eclipse ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache

### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/

### VS Code ###
.vscode/

### Mac OS ###
.DS_Store
87 changes: 87 additions & 0 deletions connectors/rocketmq-connect-hologres/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-connect-hologres</artifactId>
<version>0.0.1-SNAPSHOT</version>

<name>rocketmq-connect-hologres</name>
<url>https://github.com/apache/rocketmq-connect/connectors/rocketmq-connect-hologres</url>

<licenses>
<license>
<name>The Apache Software License, Version 2.0</name>
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
</license>
</licenses>

<issueManagement>
<system>jira</system>
<url>https://issues.apache.org/jira/browse/RocketMQ</url>
</issueManagement>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>

<dependencies>
<dependency>
<groupId>io.openmessaging</groupId>
<artifactId>openmessaging-connector</artifactId>
<version>0.1.4</version>
</dependency>

<dependency>
<groupId>com.alibaba.hologres</groupId>
<artifactId>holo-client</artifactId>
<version>2.2.8</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
**rocketmq-connect-hologres**

在启动runtime之后,通过发送http消息到runtime,携带connector和task的参数,启动connector

**参数说明**

- **connector-class**: connector的类名
- **tasks.num**: 启动的task数目

##### parameter configuration

| parameter | effect | required | default |
|-----------|---------------------------------|----------|---------|
| host | The Host of the hologres server | yes | null |
| port | The Port of the hologres server | yes | null |
| database | The info of the database | yes | null |
| table | The info of the table | yes | null |
| username | The info of the username | yes | null |
| password | The info of the password | yes | null |

**启动 Source Connector**
curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/hologresSourceConnector -d '{"connector.class":"org.apache.rocketmq.connect.hologres.connector.HologresSourceConnector","host":"localhost","port":"80","database":"test_db","username":"","password":"","table":"public.test_table","connect.topicname":"holoTopic","max.tasks":"2","slotName":"","startTime":"2023-05-31 12:00:00","value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter","key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"}'

```
POST http://${runtime-ip}:${runtime-port}/connectors/hologresSinkConnector
{
"connector.class":"org.apache.rocketmq.connect.hologres.connector.HologresSinkConnector",
"host":"localhost",
"port":80,
"database":"test_db",
"table":"public.test_table",
"max.tasks":2,
"connect.topicname":"holoTopic",
"slotName":"",
"startTime":"2023-05-31 12:00:00",
"value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
"key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"
}
```

##### parameter configuration

| parameter | effect | required | default |
|----------------------------|------------------------------------------|----------|---------|
| slotName | The slotName of the binlog | yes | null |
| startTime | Where to start consume binlog | yes | null |
| binlogReadBatchSize | The read batch size of binlog reader | no | 1024 |
| binlogHeartBeatIntervalMs | The heart beat interval of binlog reader | no | -1 |
| binlogIgnoreDelete | Whether ignore Delete operation | no | false |
| binlogIgnoreBeforeUpdate | Whether ignore BeforeUpdate operation | no | false |
| retryCount | The max retry times of consume binlog | no | 3 |
| binlogCommitTimeIntervalMs | The commit interval of binlog | no | 5000 |

**启动Sink Connector**
curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/hologresSinkConnector -d '{"connector.class":"org.apache.rocketmq.connect.hologres.connector.HologresSinkConnector","host":"localhost","port":"80","database":"test_db","username":"","password":"","table":"public.test_table","connect.topicnames":"holoTopic","max.tasks":"2","value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter","key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"}'

```
POST http://${runtime-ip}:${runtime-port}/connectors/hologresSinkConnector
{
"connector.class":"org.apache.rocketmq.connect.hologres.connector.HologresSinkConnector",
"host":"localhost",
"port":80,
"database":"test_db",
"table":"public.test_table",
"max.tasks":2,
"connect.topicnames":"holoTopic",
"value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
"key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"
}
```

##### parameter configuration

| parameter | effect | required | default |
|------------------|------------------------------------|----------|-------------------|
| dynamicPartition | Whether open dynamicPartition | no | false |
| writeMode | The write mode of the holo client | no | INSERT_OR_REPLACE |

**查看Connector运行状态**

http://127.0.0.1:8081/connectors/connector-name/status

**查看Connector配置**

http://127.0.0.1:8081/connectors/connector-name/config

**关闭Connector**

http://127.0.0.1:8081/connectors/connector-name/stop
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.connect.hologres.config;

import io.openmessaging.KeyValue;

import java.util.HashSet;
import java.util.Set;

public class AbstractHologresConfig {

public static final Set<String> REQUEST_CONFIG = new HashSet<String>(){
{
add(HologresConstant.HOST);
add(HologresConstant.PORT);
add(HologresConstant.DATABASE);
}
};

private KeyValue keyValue;
private String host;
private int port;
private String username;
private String password;
private String database;
private String table;

public AbstractHologresConfig(KeyValue keyValue) {
this.keyValue = keyValue;
ConfigUtils.load(keyValue, this);
}

public KeyValue getKeyValue() {
return keyValue;
}

public void setKeyValue(KeyValue keyValue) {
this.keyValue = keyValue;
}

public String getHost() {
return host;
}

public void setHost(String host) {
this.host = host;
}

public int getPort() {
return port;
}

public void setPort(int port) {
this.port = port;
}

public String getUsername() {
return username;
}

public void setUsername(String username) {
this.username = username;
}

public String getPassword() {
return password;
}

public void setPassword(String password) {
this.password = password;
}

public String getDatabase() {
return database;
}

public void setDatabase(String database) {
this.database = database;
}

public String getTable() {
return table;
}

public void setTable(String table) {
this.table = table;
}

public String getJdbcUrl() {
return String.format("%s%s:%d/%s", HologresConstant.URL_PREFIX, getHost(), getPort(), getDatabase());
}
}
Loading

0 comments on commit 44b80d8

Please sign in to comment.