diff --git a/README.md b/README.md
index 89b7b2e35..20c5b8ac6 100644
--- a/README.md
+++ b/README.md
@@ -485,7 +485,7 @@ Environment variables配置
> CONNECT_HOME=${user path}/rocketmq-connect/distribution
-###集群模式启动Connect Worker
+### 集群模式启动Connect Worker
Main Class配置
>org.apache.rocketmq.connect.runtime.DistributedConnectStartup
diff --git a/connectors/rocketmq-connect-hologres/.gitignore b/connectors/rocketmq-connect-hologres/.gitignore
new file mode 100644
index 000000000..5ff6309b7
--- /dev/null
+++ b/connectors/rocketmq-connect-hologres/.gitignore
@@ -0,0 +1,38 @@
+target/
+!.mvn/wrapper/maven-wrapper.jar
+!**/src/main/**/target/
+!**/src/test/**/target/
+
+### IntelliJ IDEA ###
+.idea/modules.xml
+.idea/jarRepositories.xml
+.idea/compiler.xml
+.idea/libraries/
+*.iws
+*.iml
+*.ipr
+
+### Eclipse ###
+.apt_generated
+.classpath
+.factorypath
+.project
+.settings
+.springBeans
+.sts4-cache
+
+### NetBeans ###
+/nbproject/private/
+/nbbuild/
+/dist/
+/nbdist/
+/.nb-gradle/
+build/
+!**/src/main/**/build/
+!**/src/test/**/build/
+
+### VS Code ###
+.vscode/
+
+### Mac OS ###
+.DS_Store
\ No newline at end of file
diff --git a/connectors/rocketmq-connect-hologres/pom.xml b/connectors/rocketmq-connect-hologres/pom.xml
new file mode 100644
index 000000000..8939576fb
--- /dev/null
+++ b/connectors/rocketmq-connect-hologres/pom.xml
@@ -0,0 +1,87 @@
+
+
+
+
+ 4.0.0
+
+ org.apache.rocketmq
+ rocketmq-connect-hologres
+ 0.0.1-SNAPSHOT
+
+ connect-hive
+ https://github.com/apache/rocketmq-connect/tree/master/connectors/rocketmq-connect-hive
+
+
+
+ The Apache Software License, Version 2.0
+ http://www.apache.org/licenses/LICENSE-2.0.txt
+
+
+
+
+ jira
+ https://issues.apache.org/jira/browse/RocketMQ
+
+
+
+ 8
+ 8
+ UTF-8
+ UTF-8
+
+
+
+
+ io.openmessaging
+ openmessaging-connector
+ 0.1.4
+
+
+
+ com.alibaba.hologres
+ holo-client
+ 2.2.8
+
+
+
+
+
+
+ maven-assembly-plugin
+ 3.0.0
+
+
+ jar-with-dependencies
+
+
+
+
+ make-assembly
+ package
+
+ single
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/README.md b/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/README.md
new file mode 100644
index 000000000..104783768
--- /dev/null
+++ b/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/README.md
@@ -0,0 +1,89 @@
+**rocketmq-connect-hologres**
+
+在启动runtime之后,通过发送http消息到runtime,携带connector和task的参数,启动connector
+
+**参数说明**
+
+- **connector-class**: connector的类名
+- **tasks.num**: 启动的task数目
+
+##### parameter configuration
+
+| parameter | effect | required | default |
+|-----------|-----------------------------|----------|---------|
+| host | The Host of the hive server | yes | null |
+| port | The Port of the hive server | yes | null |
+| database | The info of the database | yes | null |
+| table | The info of the table | yes | null |
+| username | The info of the hive server | yes | null |
+| password | The info of the hive server | yes | null |
+
+**启动 Source Connector**
+curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/hologresSourceConnector -d '{"connector.class":"org.apache.rocketmq.connect.hologres.connector.HologresSourceConnector","host":"localhost","port":"80","database":"test_db","username":"","password":"","table":"public.test_table","connect.topicname":"holoTopic","max.tasks":"2","slotName":"","startTime":"2023-05-31 12:00:00","value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter","key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"}'
+
+```
+POST http://${runtime-ip}:${runtime-port}/connectors/hologresSinkConnector
+{
+ "connector.class":"org.apache.rocketmq.connect.hologres.connector.HologresSinkConnector",
+ "host":"localhost",
+ "port":80,
+ "database":"test_db",
+ "table":"public.test_table",
+ "max.tasks":2,
+ "connect.topicname":"holoTopic",
+ "slotName":"",
+ "startTime":"2023-05-31 12:00:00",
+ "value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
+ "key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"
+}
+```
+
+##### parameter configuration
+
+| parameter | effect | required | default |
+|----------------------------|------------------------------------------|----------|---------|
+| slotName | The slotName of the binlog | yes | null |
+| startTime | Where to start consume binlog | yes | null |
+| binlogReadBatchSize | The read batch size of binlog reader | no | 1024 |
+| binlogHeartBeatIntervalMs | The heart beat interval of binlog reader | no | -1 |
+| binlogIgnoreDelete | Whether ignore Delete operation | no | false |
+| binlogIgnoreBeforeUpdate | Whether ignore BeforeUpdate operation | no | false |
+| retryCount | The max retry times of consume binlog | no | 3 |
+| binlogCommitTimeIntervalMs | The commit interval of binlog | no | 5000 |
+
+**启动Sink Connector**
+curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/hologresSinkConnector -d '{"connector.class":"org.apache.rocketmq.connect.hologres.connector.HologresSinkConnector","host":"localhost","port":"80","database":"test_db","username":"","password":"","table":"public.test_table","connect.topicnames":"holoTopic","max.tasks":"2","value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter","key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"}'
+
+```
+POST http://${runtime-ip}:${runtime-port}/connectors/hologresSinkConnector
+{
+ "connector.class":"org.apache.rocketmq.connect.hologres.connector.HologresSinkConnector",
+ "host":"localhost",
+ "port":80,
+ "database":"test_db",
+ "table":"public.test_table",
+ "max.tasks":2,
+ "connect.topicnames":"holoTopic",
+ "value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
+ "key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"
+}
+```
+
+##### parameter configuration
+
+| parameter | effect | required | default |
+|------------------|------------------------------------|----------|-------------------|
+| dynamicPartition | Whether open dynamicPartition | no | false |
+| writeMode | The write mode of the holo client | no | INSERT_OR_REPLACE |
+
+**查看Connector运行状态**
+
+http://127.0.0.1:8081/connectors/connector-name/status
+
+**查看Connector配置**
+
+http://127.0.0.1:8081/connectors/connector-name/config
+
+**关闭Connector**
+
+http://127.0.0.1:8081/connectors/connector-name/stop
diff --git a/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/config/AbstractHologresConfig.java b/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/config/AbstractHologresConfig.java
new file mode 100644
index 000000000..59d49192f
--- /dev/null
+++ b/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/config/AbstractHologresConfig.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.hologres.config;
+
+import io.openmessaging.KeyValue;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class AbstractHologresConfig {
+
+ public static final Set REQUEST_CONFIG = new HashSet(){
+ {
+ add(HologresConstant.HOST);
+ add(HologresConstant.PORT);
+ add(HologresConstant.DATABASE);
+ }
+ };
+
+ private KeyValue keyValue;
+ private String host;
+ private int port;
+ private String username;
+ private String password;
+ private String database;
+ private String table;
+
+ public AbstractHologresConfig(KeyValue keyValue) {
+ this.keyValue = keyValue;
+ ConfigUtils.load(keyValue, this);
+ }
+
+ public KeyValue getKeyValue() {
+ return keyValue;
+ }
+
+ public void setKeyValue(KeyValue keyValue) {
+ this.keyValue = keyValue;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public void setPort(int port) {
+ this.port = port;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+
+ public void setUsername(String username) {
+ this.username = username;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public String getDatabase() {
+ return database;
+ }
+
+ public void setDatabase(String database) {
+ this.database = database;
+ }
+
+ public String getTable() {
+ return table;
+ }
+
+ public void setTable(String table) {
+ this.table = table;
+ }
+
+ public String getJdbcUrl() {
+ return String.format("%s%s:%d/%s", HologresConstant.URL_PREFIX, getHost(), getPort(), getDatabase());
+ }
+}
diff --git a/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/config/ConfigUtils.java b/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/config/ConfigUtils.java
new file mode 100644
index 000000000..aed52d8d5
--- /dev/null
+++ b/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/config/ConfigUtils.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.hologres.config;
+
+import io.openmessaging.KeyValue;
+
+import java.lang.reflect.Method;
+
+public class ConfigUtils {
+
+ public static void load(KeyValue keyValue, Object object) {
+ properties2Object(keyValue, object);
+ }
+
+ private static void properties2Object(final KeyValue p, final Object object) {
+ Method[] methods = object.getClass().getMethods();
+ for (Method method : methods) {
+ String mn = method.getName();
+ if (mn.startsWith("set")) {
+ try {
+ String tmp = mn.substring(4);
+ String first = mn.substring(3, 4);
+
+ String key = first.toLowerCase() + tmp;
+ String property = p.getString(key);
+ if (property != null) {
+ Class>[] pt = method.getParameterTypes();
+ if (pt != null && pt.length > 0) {
+ String cn = pt[0].getSimpleName();
+ Object arg;
+ if (cn.equals("int") || cn.equals("Integer")) {
+ arg = Integer.parseInt(property);
+ } else if (cn.equals("long") || cn.equals("Long")) {
+ arg = Long.parseLong(property);
+ } else if (cn.equals("double") || cn.equals("Double")) {
+ arg = Double.parseDouble(property);
+ } else if (cn.equals("boolean") || cn.equals("Boolean")) {
+ arg = Boolean.parseBoolean(property);
+ } else if (cn.equals("float") || cn.equals("Float")) {
+ arg = Float.parseFloat(property);
+ } else if (cn.equals("String")) {
+ arg = property;
+ } else {
+ continue;
+ }
+ method.invoke(object, arg);
+ }
+ }
+ } catch (Throwable ignored) {
+ }
+ }
+ }
+ }
+}
diff --git a/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/config/HologresConstant.java b/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/config/HologresConstant.java
new file mode 100644
index 000000000..f1abacff7
--- /dev/null
+++ b/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/config/HologresConstant.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.hologres.config;
+
+public class HologresConstant {
+
+ public static String TASK_NUM = "tasks.num";
+ public static final String DRIVER_NAME = "com.aliyun.hologres.jdbc.HoloDriver";
+ public static final String URL_PREFIX = "jdbc:hologres://";
+ public static final String USERNAME = "username";
+ public static final String PASSWORD = "password";
+ public static final String HOST = "host";
+ public static final String PORT = "port";
+ public static final String DATABASE = "database";
+ public static final String TABLES = "tables";
+ public static final String TABLE_NAME = "tableName";
+
+ // source related
+ public static final String SOURCE_BATCH_SIZE = "binlogReadBatchSize";
+ public static final int SOURCE_BATCH_SIZE_DEFAULT = 1024;
+ public static final String SOURCE_HEARTBEAT_INTERVAL = "binlogHeartBeatIntervalMs";
+ public static final int SOURCE_HEARTBEAT_INTERVAL_DEFAULT = -1;
+ public static final String SOURCE_IGNORE_DELETE = "binlogIgnoreDelete";
+ public static final String SOURCE_IGNORE_DELETE_DEFAULT = "false";
+ public static final String SOURCE_IGNORE_BEFORE_UPDATE = "binlogIgnoreBeforeUpdate";
+ public static final String SOURCE_IGNORE_BEFORE_UPDATE_DEFAULT = "false";
+ public static final String SOURCE_RETRY_COUNT = "retryCount";
+ public static final int SOURCE_RETRY_COUNT_DEFAULT = 3;
+ public static final String SOURCE_COMMIT_TIME_INTERVAL = "binlogCommitIntervalMs";
+ public static final int SOURCE_COMMIT_TIME_INTERVAL_DEFAULT = 5000;
+ public static final String SOURCE_SLOT_NAME = "slotName";
+ public static final String PARTITION_INFO_KEY = "partitionInfo";
+ public static final String PARTITION_INDEX_KEY = "partitionIndex";
+ public static final String HOLOGRES_POSITION = "HOLOGRES_POSITION";
+
+ // sink related
+ public static final String DYNAMIC_PARTITION = "dynamicPartition";
+ public static final String DYNAMIC_PARTITION_DEFAULT = "false";
+ public static final String WRITE_MODE = "writeMode";
+ public static final String WRITE_MODE_DEFAULT = "INSERT_OR_REPLACE";
+
+ public static final String WRITE_BATCH = "writeBatch";
+ public static final int WRITE_BATCH_DEFAULT = 512;
+
+}
diff --git a/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/config/HologresSinkConfig.java b/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/config/HologresSinkConfig.java
new file mode 100644
index 000000000..aecbc8341
--- /dev/null
+++ b/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/config/HologresSinkConfig.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.hologres.config;
+
+import io.openmessaging.KeyValue;
+
+import static org.apache.rocketmq.connect.hologres.config.HologresConstant.DYNAMIC_PARTITION;
+import static org.apache.rocketmq.connect.hologres.config.HologresConstant.DYNAMIC_PARTITION_DEFAULT;
+import static org.apache.rocketmq.connect.hologres.config.HologresConstant.WRITE_MODE;
+import static org.apache.rocketmq.connect.hologres.config.HologresConstant.WRITE_MODE_DEFAULT;
+
+public class HologresSinkConfig extends AbstractHologresConfig {
+
+ private boolean dynamicPartition;
+ private String writeMode;
+
+ public HologresSinkConfig(KeyValue keyValue) {
+ super(keyValue);
+ this.dynamicPartition = Boolean.parseBoolean(keyValue.getString(DYNAMIC_PARTITION, DYNAMIC_PARTITION_DEFAULT));
+ this.writeMode = keyValue.getString(WRITE_MODE, WRITE_MODE_DEFAULT);
+ }
+
+ public boolean isDynamicPartition() {
+ return dynamicPartition;
+ }
+
+ public void setDynamicPartition(boolean dynamicPartition) {
+ this.dynamicPartition = dynamicPartition;
+ }
+
+ public String getWriteMode() {
+ return writeMode;
+ }
+
+ public void setWriteMode(String writeMode) {
+ this.writeMode = writeMode;
+ }
+}
diff --git a/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/config/HologresSourceConfig.java b/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/config/HologresSourceConfig.java
new file mode 100644
index 000000000..4e6bd766d
--- /dev/null
+++ b/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/config/HologresSourceConfig.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.hologres.config;
+
+import io.openmessaging.KeyValue;
+
+import static org.apache.rocketmq.connect.hologres.config.HologresConstant.SOURCE_BATCH_SIZE;
+import static org.apache.rocketmq.connect.hologres.config.HologresConstant.SOURCE_BATCH_SIZE_DEFAULT;
+import static org.apache.rocketmq.connect.hologres.config.HologresConstant.SOURCE_COMMIT_TIME_INTERVAL;
+import static org.apache.rocketmq.connect.hologres.config.HologresConstant.SOURCE_COMMIT_TIME_INTERVAL_DEFAULT;
+import static org.apache.rocketmq.connect.hologres.config.HologresConstant.SOURCE_HEARTBEAT_INTERVAL;
+import static org.apache.rocketmq.connect.hologres.config.HologresConstant.SOURCE_HEARTBEAT_INTERVAL_DEFAULT;
+import static org.apache.rocketmq.connect.hologres.config.HologresConstant.SOURCE_IGNORE_BEFORE_UPDATE;
+import static org.apache.rocketmq.connect.hologres.config.HologresConstant.SOURCE_IGNORE_BEFORE_UPDATE_DEFAULT;
+import static org.apache.rocketmq.connect.hologres.config.HologresConstant.SOURCE_IGNORE_DELETE;
+import static org.apache.rocketmq.connect.hologres.config.HologresConstant.SOURCE_IGNORE_DELETE_DEFAULT;
+import static org.apache.rocketmq.connect.hologres.config.HologresConstant.SOURCE_RETRY_COUNT;
+import static org.apache.rocketmq.connect.hologres.config.HologresConstant.SOURCE_RETRY_COUNT_DEFAULT;
+
+public class HologresSourceConfig extends AbstractHologresConfig {
+
+ private int binlogReadBatchSize;
+ private int binlogHeartBeatIntervalMs;
+ private boolean binlogIgnoreDelete;
+ private boolean binlogIgnoreBeforeUpdate;
+ private int retryCount;
+ private int binlogCommitTimeIntervalMs;
+
+ private String slotName;
+ // format: YYYY-mm-dd HH:mm:ss
+ private String startTime;
+
+ public HologresSourceConfig(KeyValue keyValue) {
+ super(keyValue);
+
+ this.binlogReadBatchSize = keyValue.getInt(SOURCE_BATCH_SIZE, SOURCE_BATCH_SIZE_DEFAULT);
+ this.binlogHeartBeatIntervalMs = keyValue.getInt(SOURCE_HEARTBEAT_INTERVAL, SOURCE_HEARTBEAT_INTERVAL_DEFAULT);
+ this.binlogIgnoreDelete = Boolean.parseBoolean(keyValue.getString(SOURCE_IGNORE_DELETE, SOURCE_IGNORE_DELETE_DEFAULT));
+ this.binlogIgnoreBeforeUpdate = Boolean.parseBoolean(keyValue.getString(SOURCE_IGNORE_BEFORE_UPDATE, SOURCE_IGNORE_BEFORE_UPDATE_DEFAULT));
+ this.retryCount = keyValue.getInt(SOURCE_RETRY_COUNT, SOURCE_RETRY_COUNT_DEFAULT);
+ this.binlogCommitTimeIntervalMs = keyValue.getInt(SOURCE_COMMIT_TIME_INTERVAL, SOURCE_COMMIT_TIME_INTERVAL_DEFAULT);
+ }
+
+ public int getBinlogReadBatchSize() {
+ return binlogReadBatchSize;
+ }
+
+ public void setBinlogReadBatchSize(int binlogReadBatchSize) {
+ this.binlogReadBatchSize = binlogReadBatchSize;
+ }
+
+ public int getBinlogHeartBeatIntervalMs() {
+ return binlogHeartBeatIntervalMs;
+ }
+
+ public void setBinlogHeartBeatIntervalMs(int binlogHeartBeatIntervalMs) {
+ this.binlogHeartBeatIntervalMs = binlogHeartBeatIntervalMs;
+ }
+
+ public boolean isBinlogIgnoreDelete() {
+ return binlogIgnoreDelete;
+ }
+
+ public void setBinlogIgnoreDelete(boolean binlogIgnoreDelete) {
+ this.binlogIgnoreDelete = binlogIgnoreDelete;
+ }
+
+ public boolean isBinlogIgnoreBeforeUpdate() {
+ return binlogIgnoreBeforeUpdate;
+ }
+
+ public void setBinlogIgnoreBeforeUpdate(boolean binlogIgnoreBeforeUpdate) {
+ this.binlogIgnoreBeforeUpdate = binlogIgnoreBeforeUpdate;
+ }
+
+ public int getRetryCount() {
+ return retryCount;
+ }
+
+ public void setRetryCount(int retryCount) {
+ this.retryCount = retryCount;
+ }
+
+ public String getSlotName() {
+ return slotName;
+ }
+
+ public void setSlotName(String slotName) {
+ this.slotName = slotName;
+ }
+
+ public String getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(String startTime) {
+ this.startTime = startTime;
+ }
+
+ public int getBinlogCommitTimeIntervalMs() {
+ return binlogCommitTimeIntervalMs;
+ }
+
+ public void setBinlogCommitTimeIntervalMs(int binlogCommitTimeIntervalMs) {
+ this.binlogCommitTimeIntervalMs = binlogCommitTimeIntervalMs;
+ }
+}
diff --git a/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/connector/HologresSinkConnector.java b/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/connector/HologresSinkConnector.java
new file mode 100644
index 000000000..5c0cd09c1
--- /dev/null
+++ b/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/connector/HologresSinkConnector.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.hologres.connector;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.sink.SinkConnector;
+import org.apache.rocketmq.connect.hologres.config.HologresSinkConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class HologresSinkConnector extends SinkConnector {
+ private static final Logger log = LoggerFactory.getLogger(HologresSinkConnector.class);
+
+ private KeyValue keyValue;
+
+ @Override
+ public void validate(KeyValue config) {
+ for (String requestKey : HologresSinkConfig.REQUEST_CONFIG) {
+ if (!config.containsKey(requestKey)) {
+ throw new RuntimeException("Request config key not exist: " + requestKey);
+ }
+ }
+ }
+
+ @Override
+ public List taskConfigs(int maxTasks) {
+ log.info("Init {} task config", maxTasks);
+ List configs = new ArrayList<>();
+ for (int i = 0; i < maxTasks; i++) {
+ configs.add(this.keyValue);
+ }
+ return configs;
+ }
+
+ @Override
+ public Class extends Task> taskClass() {
+ return HologresSinkTask.class;
+ }
+
+ @Override
+ public void start(KeyValue keyValue) {
+ this.keyValue = keyValue;
+ }
+
+ @Override
+ public void stop() {
+ this.keyValue = null;
+ }
+}
diff --git a/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/connector/HologresSinkTask.java b/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/connector/HologresSinkTask.java
new file mode 100644
index 000000000..ba6dd917e
--- /dev/null
+++ b/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/connector/HologresSinkTask.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.hologres.connector;
+
+import com.alibaba.hologres.client.HoloClient;
+import com.alibaba.hologres.client.HoloConfig;
+import com.alibaba.hologres.client.Put;
+import com.alibaba.hologres.client.exception.HoloClientException;
+import com.alibaba.hologres.client.model.TableSchema;
+import com.alibaba.hologres.client.model.WriteMode;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.sink.SinkTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.Struct;
+import io.openmessaging.connector.api.errors.ConnectException;
+import io.openmessaging.connector.api.errors.RetriableException;
+import org.apache.rocketmq.connect.hologres.config.HologresSinkConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class HologresSinkTask extends SinkTask {
+ private static final Logger log = LoggerFactory.getLogger(HologresSinkTask.class);
+
+ private KeyValue keyValue;
+ private HologresSinkConfig sinkConfig;
+ private HoloConfig holoClientConfig;
+ private HoloClient holoClient;
+ private final Map tableSchemaCache = new HashMap<>();
+
+ @Override
+ public void put(List records) throws ConnectException {
+ if (records == null || records.isEmpty()) {
+ return;
+ }
+
+ log.info("Received {} sink records.", records.size());
+
+ try {
+ if (!tableSchemaCache.containsKey(sinkConfig.getTable())) {
+ tableSchemaCache.put(sinkConfig.getTable(), holoClient.getTableSchema(sinkConfig.getTable()));
+ }
+ TableSchema tableSchema = tableSchemaCache.get(sinkConfig.getTable());
+ // TODO: check record match table schema
+ log.info("get table schema {}", tableSchema);
+
+ List puts = new ArrayList<>();
+ for (ConnectRecord record : records) {
+ log.info("Convert record: {}", record);
+ Put put = new Put(tableSchema);
+ List fields = record.getSchema().getFields();
+ Struct structData = (Struct) record.getData();
+ fields.forEach(field -> put.setObject(field.getName(), structData.get(field)));
+ puts.add(put);
+ }
+
+ holoClient.put(puts);
+ log.info("Put {} record to hologres success", puts.size());
+ } catch (HoloClientException e) {
+ log.error("Put record to hologres failed", e);
+ throw new RetriableException(e);
+ }
+ }
+
+ @Override
+ public void start(KeyValue keyValue) {
+ this.keyValue = keyValue;
+ this.sinkConfig = new HologresSinkConfig(keyValue);
+ this.holoClientConfig = buildHoloConfig(sinkConfig);
+ log.info("Initializing hologres client");
+ try {
+ this.holoClient = new HoloClient(holoClientConfig);
+ this.holoClient.setAsyncCommit(false);
+ } catch (HoloClientException e) {
+ log.error("Init hologres client failed", e);
+ throw new RuntimeException(e);
+ }
+ log.info("Hologres client started.");
+ }
+
+ private HoloConfig buildHoloConfig(HologresSinkConfig sinkConfig) {
+ HoloConfig holoConfig = new HoloConfig();
+ holoConfig.setJdbcUrl(sinkConfig.getJdbcUrl());
+ holoConfig.setUsername(sinkConfig.getUsername());
+ holoConfig.setPassword(sinkConfig.getPassword());
+ // TODO: support more configuration
+ holoConfig.setDynamicPartition(sinkConfig.isDynamicPartition());
+ holoConfig.setWriteMode(WriteMode.valueOf(sinkConfig.getWriteMode()));
+ return holoConfig;
+ }
+
+ @Override
+ public void stop() {
+ log.info("Stopping hologres client");
+ this.holoClient.close();
+ }
+}
diff --git a/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/connector/HologresSourceConnector.java b/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/connector/HologresSourceConnector.java
new file mode 100644
index 000000000..eb7291114
--- /dev/null
+++ b/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/connector/HologresSourceConnector.java
@@ -0,0 +1,53 @@
+package org.apache.rocketmq.connect.hologres.connector;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.source.SourceConnector;
+import org.apache.rocketmq.connect.hologres.config.HologresSinkConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class HologresSourceConnector extends SourceConnector {
+ private static final Logger log = LoggerFactory.getLogger(HologresSourceConnector.class);
+
+ private KeyValue keyValue;
+
+ @Override
+ public void validate(KeyValue config) {
+ for (String requestKey : HologresSinkConfig.REQUEST_CONFIG) {
+ if (!config.containsKey(requestKey)) {
+ throw new RuntimeException("Request config key not exist: " + requestKey);
+ }
+ }
+ }
+
+ @Override
+ public List taskConfigs(int maxTasks) {
+ log.info("Init {} source task config", maxTasks);
+ List configs = new ArrayList<>();
+ for (int i = 0; i < maxTasks; i++) {
+ configs.add(this.keyValue);
+ }
+ return configs;
+ }
+
+ @Override
+ public Class extends Task> taskClass() {
+ return HologresSourceTask.class;
+ }
+
+ @Override
+ public void start(KeyValue keyValue) {
+ log.info("HologresSourceConnector start enter");
+ this.keyValue = keyValue;
+ }
+
+ @Override
+ public void stop() {
+ log.info("HologresSourceConnector start enter");
+ this.keyValue = null;
+ }
+}
diff --git a/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/connector/HologresSourceTask.java b/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/connector/HologresSourceTask.java
new file mode 100644
index 000000000..48d46ade2
--- /dev/null
+++ b/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/connector/HologresSourceTask.java
@@ -0,0 +1,181 @@
+package org.apache.rocketmq.connect.hologres.connector;
+
+import com.alibaba.hologres.client.BinlogShardGroupReader;
+import com.alibaba.hologres.client.HoloClient;
+import com.alibaba.hologres.client.HoloConfig;
+import com.alibaba.hologres.client.Subscribe;
+import com.alibaba.hologres.client.exception.HoloClientException;
+import com.alibaba.hologres.client.model.Column;
+import com.alibaba.hologres.client.model.binlog.BinlogHeartBeatRecord;
+import com.alibaba.hologres.client.model.binlog.BinlogRecord;
+import com.alibaba.hologres.com.google.common.base.Strings;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.source.SourceTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.Struct;
+import org.apache.rocketmq.connect.hologres.config.HologresSourceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.rocketmq.connect.hologres.config.HologresConstant.PARTITION_INFO_KEY;
+import static org.apache.rocketmq.connect.hologres.config.HologresConstant.PARTITION_INDEX_KEY;
+import static org.apache.rocketmq.connect.hologres.config.HologresConstant.HOLOGRES_POSITION;
+
+
+public class HologresSourceTask extends SourceTask {
+ private static final Logger log = LoggerFactory.getLogger(HologresSourceTask.class);
+
+ private KeyValue keyValue;
+ private HologresSourceConfig sourceConfig;
+ private HoloConfig holoClientConfig;
+ private HoloClient holoClient;
+ private BinlogShardGroupReader reader;
+ private long count = 0;
+
+ @Override
+ public List poll() throws InterruptedException {
+ List records = new ArrayList<>();
+ try {
+ BinlogRecord record = reader.getBinlogRecord();
+
+ if (record instanceof BinlogHeartBeatRecord) {
+ return null;
+ }
+
+ if (++count % 1000 == 0) {
+ reader.commit(sourceConfig.getBinlogCommitTimeIntervalMs());
+ }
+
+ records.add(hologresRecord2ConnectRecord(record));
+ } catch (Exception e) {
+ log.error("Error while polling data from Hologres", e);
+ }
+ return records;
+ }
+
+ private ConnectRecord hologresRecord2ConnectRecord(BinlogRecord record) {
+ List fields = buildFields(record);
+ Schema schema = SchemaBuilder.struct()
+ .name(record.getTableName().getTableName())
+ .build();
+ schema.setFields(fields);
+ return new ConnectRecord(
+ buildRecordPartition(record),
+ buildRecordOffset(record),
+ System.currentTimeMillis(),
+ schema,
+ buildPayLoad(fields, schema, record));
+ }
+
+ private RecordPartition buildRecordPartition(BinlogRecord record) {
+ Map partitionMap = new HashMap<>();
+ partitionMap.put(PARTITION_INFO_KEY, record.getSchema().getPartitionInfo());
+ partitionMap.put(PARTITION_INDEX_KEY, String.valueOf(record.getSchema().getPartitionIndex()));
+ return new RecordPartition(partitionMap);
+ }
+
+ private RecordOffset buildRecordOffset(BinlogRecord record) {
+ Map offsetMap = new HashMap<>();
+ RecordOffset recordOffset = new RecordOffset(offsetMap);
+ offsetMap.put(record.getTableName() + ":" + HOLOGRES_POSITION, record.getBinlogLsn());
+ return recordOffset;
+ }
+
+ private List buildFields(BinlogRecord record) {
+ List fields = new ArrayList<>();
+ final Column[] columns = record.getSchema().getColumnSchema();
+
+ for (int i = 0; i < columns.length; ++i) {
+ fields.add(new Field(i, columns[i].getName(), getSchema(record.getObject(i))));
+ }
+
+ return fields;
+ }
+
+ public Schema getSchema(Object obj) {
+ if (obj instanceof Integer) {
+ return SchemaBuilder.int32().build();
+ } else if (obj instanceof Long) {
+ return SchemaBuilder.int64().build();
+ } else if (obj instanceof String) {
+ return SchemaBuilder.string().build();
+ } else if (obj instanceof Date) {
+ return SchemaBuilder.time().build();
+ } else if (obj instanceof Timestamp) {
+ return SchemaBuilder.timestamp().build();
+ } else if (obj instanceof Boolean) {
+ return SchemaBuilder.bool().build();
+ }
+ return SchemaBuilder.string().build();
+ }
+
+ private Struct buildPayLoad(List fields, Schema schema, BinlogRecord record) {
+ Struct payLoad = new Struct(schema);
+ for (int i = 0; i < fields.size(); ++i) {
+ payLoad.put(fields.get(i), record.getValues()[i]);
+ }
+ return payLoad;
+ }
+
+ @Override
+ public void start(KeyValue keyValue) {
+ this.keyValue = keyValue;
+ this.sourceConfig = new HologresSourceConfig(keyValue);
+ this.holoClientConfig = buildHoloConfig(sourceConfig);
+ checkConfigValidate();
+
+ log.info("Initializing hologres client and binlog subscribe");
+ try {
+ this.holoClient = new HoloClient(holoClientConfig);
+ this.reader = holoClient.binlogSubscribe(Subscribe
+ .newStartTimeBuilder(sourceConfig.getTable(), sourceConfig.getSlotName())
+ .setBinlogReadStartTime(sourceConfig.getStartTime())
+ .build());
+ } catch (HoloClientException e) {
+ log.error("Init hologres client failed", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void checkConfigValidate() {
+ if (Strings.isNullOrEmpty(sourceConfig.getSlotName())) {
+ throw new RuntimeException("Hologres source connector must set slot name.");
+ }
+
+ if (Strings.isNullOrEmpty(sourceConfig.getStartTime())) {
+ throw new RuntimeException("Hologres source connector must set start time.");
+ }
+ }
+
+ private HoloConfig buildHoloConfig(HologresSourceConfig sourceConfig) {
+ HoloConfig holoConfig = new HoloConfig();
+ holoConfig.setJdbcUrl(sourceConfig.getJdbcUrl());
+ holoConfig.setUsername(sourceConfig.getUsername());
+ holoConfig.setPassword(sourceConfig.getPassword());
+
+ // binlog optional configuration
+ holoConfig.setBinlogReadBatchSize(sourceConfig.getBinlogReadBatchSize());
+ holoConfig.setBinlogIgnoreDelete(sourceConfig.isBinlogIgnoreDelete());
+ holoConfig.setBinlogIgnoreBeforeUpdate(sourceConfig.isBinlogIgnoreBeforeUpdate());
+ holoConfig.setBinlogHeartBeatIntervalMs(sourceConfig.getBinlogHeartBeatIntervalMs());
+ holoConfig.setRetryCount(sourceConfig.getRetryCount());
+
+ return holoConfig;
+ }
+
+ public void stop() {
+ log.info("Stopping HologresSourceTask");
+ }
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
index 496e02767..23e5d3ed3 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
@@ -388,7 +388,7 @@ public synchronized void startConnectors(Map connectorC
@Override
public void onCompletion(Throwable error, TargetState result) {
if (error != null) {
- log.error(error.getMessage());
+ log.error("Start connector failed", error);
} else {
log.info("Start connector {} and set target state {} successed!!", connectorName, result);
}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
index 79317bdb9..8e518be3b 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
@@ -250,7 +250,7 @@ private Boolean sendRecord() throws InterruptedException {
SendCallback callback = new SendCallback() {
@Override
public void onSuccess(SendResult result) {
- log.info("Successful send message to RocketMQ:{}, Topic {}", result.getMsgId(), result.getMessageQueue().getTopic());
+ log.info("Successful send message to RocketMQ, MsgId:{}, Topic {}", result.getMsgId(), result.getMessageQueue().getTopic());
// complete record
counter.completeRecord();
// commit record for custom
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/record/json/JsonConverter.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/record/json/JsonConverter.java
index d9e825947..cf45ad22d 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/record/json/JsonConverter.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/record/json/JsonConverter.java
@@ -313,7 +313,7 @@ public byte[] fromConnectData(String topic, Schema schema, Object value) {
try {
return serializer.serialize(topic, jsonValue);
} catch (Exception e) {
- throw new ConnectException("Converting Kafka Connect data to byte[] failed due to serialization error: ", e);
+ throw new ConnectException("Converting RocketMQ Connect data to byte[] failed due to serialization error: ", e);
}
}
@@ -334,7 +334,7 @@ public SchemaAndValue toConnectData(String topic, byte[] value) {
try {
jsonValue = deserializer.deserialize(topic, value);
} catch (Exception e) {
- throw new ConnectException("Converting byte[] to Kafka Connect data failed due to serialization error: ", e);
+ throw new ConnectException("Converting byte[] to RocketMQ Connect data failed due to serialization error: ", e);
}
JSONObject newJsonValue;
if (!converterConfig.schemasEnabled()) {
@@ -500,8 +500,8 @@ public JSONObject asJsonSchema(Schema schema) {
}
/**
- * Convert this object, in the org.apache.kafka.connect.data format, into a JSON object, returning both the schema
- * and the converted object.
+ * Convert this object, in the io.openmessaging.connector.api.data.ConnectRecord format,
+ * into a JSON object, returning both the schema and the converted object.
*/
private Object convertToJson(Schema schema, Object value) {
if (value == null) {