From e67758fe55162bec8124845a1fd1492da2954133 Mon Sep 17 00:00:00 2001 From: wangfan <1841788037@qq.com> Date: Wed, 31 May 2023 17:11:42 +0800 Subject: [PATCH] support hologres connector --- README.md | 2 +- .../rocketmq-connect-hologres/.gitignore | 38 ++++ connectors/rocketmq-connect-hologres/pom.xml | 87 +++++++++ .../rocketmq/connect/hologres/README.md | 89 ++++++++++ .../config/AbstractHologresConfig.java | 107 +++++++++++ .../connect/hologres/config/ConfigUtils.java | 69 +++++++ .../hologres/config/HologresConstant.java | 60 +++++++ .../hologres/config/HologresSinkConfig.java | 53 ++++++ .../hologres/config/HologresSourceConfig.java | 122 +++++++++++++ .../connector/HologresSinkConnector.java | 68 +++++++ .../hologres/connector/HologresSinkTask.java | 117 ++++++++++++ .../connector/HologresSourceConnector.java | 53 ++++++ .../connector/HologresSourceTask.java | 168 ++++++++++++++++++ .../runtime/connectorwrapper/Worker.java | 2 +- .../connectorwrapper/WorkerSourceTask.java | 2 +- .../converter/record/json/JsonConverter.java | 8 +- 16 files changed, 1038 insertions(+), 7 deletions(-) create mode 100644 connectors/rocketmq-connect-hologres/.gitignore create mode 100644 connectors/rocketmq-connect-hologres/pom.xml create mode 100644 connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/README.md create mode 100644 connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/config/AbstractHologresConfig.java create mode 100644 connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/config/ConfigUtils.java create mode 100644 connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/config/HologresConstant.java create mode 100644 connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/config/HologresSinkConfig.java create mode 100644 connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/config/HologresSourceConfig.java create mode 100644 connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/connector/HologresSinkConnector.java create mode 100644 connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/connector/HologresSinkTask.java create mode 100644 connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/connector/HologresSourceConnector.java create mode 100644 connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/connector/HologresSourceTask.java diff --git a/README.md b/README.md index 89b7b2e35..20c5b8ac6 100644 --- a/README.md +++ b/README.md @@ -485,7 +485,7 @@ Environment variables配置 > CONNECT_HOME=${user path}/rocketmq-connect/distribution -###集群模式启动Connect Worker +### 集群模式启动Connect Worker Main Class配置 >org.apache.rocketmq.connect.runtime.DistributedConnectStartup diff --git a/connectors/rocketmq-connect-hologres/.gitignore b/connectors/rocketmq-connect-hologres/.gitignore new file mode 100644 index 000000000..5ff6309b7 --- /dev/null +++ b/connectors/rocketmq-connect-hologres/.gitignore @@ -0,0 +1,38 @@ +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### IntelliJ IDEA ### +.idea/modules.xml +.idea/jarRepositories.xml +.idea/compiler.xml +.idea/libraries/ +*.iws +*.iml +*.ipr + +### Eclipse ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ + +### Mac OS ### +.DS_Store \ No newline at end of file diff --git a/connectors/rocketmq-connect-hologres/pom.xml b/connectors/rocketmq-connect-hologres/pom.xml new file mode 100644 index 000000000..8939576fb --- /dev/null +++ b/connectors/rocketmq-connect-hologres/pom.xml @@ -0,0 +1,87 @@ + + + + + 4.0.0 + + org.apache.rocketmq + rocketmq-connect-hologres + 0.0.1-SNAPSHOT + + connect-hive + https://github.com/apache/rocketmq-connect/tree/master/connectors/rocketmq-connect-hive + + + + The Apache Software License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + + + + + jira + https://issues.apache.org/jira/browse/RocketMQ + + + + 8 + 8 + UTF-8 + UTF-8 + + + + + io.openmessaging + openmessaging-connector + 0.1.4 + + + + com.alibaba.hologres + holo-client + 2.2.8 + + + + + + + maven-assembly-plugin + 3.0.0 + + + jar-with-dependencies + + + + + make-assembly + package + + single + + + + + + + + \ No newline at end of file diff --git a/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/README.md b/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/README.md new file mode 100644 index 000000000..104783768 --- /dev/null +++ b/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/README.md @@ -0,0 +1,89 @@ +**rocketmq-connect-hologres** + +在启动runtime之后,通过发送http消息到runtime,携带connector和task的参数,启动connector + +**参数说明** + +- **connector-class**: connector的类名 +- **tasks.num**: 启动的task数目 + +##### parameter configuration + +| parameter | effect | required | default | +|-----------|-----------------------------|----------|---------| +| host | The Host of the hive server | yes | null | +| port | The Port of the hive server | yes | null | +| database | The info of the database | yes | null | +| table | The info of the table | yes | null | +| username | The info of the hive server | yes | null | +| password | The info of the hive server | yes | null | + +**启动 Source Connector** +curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/hologresSourceConnector -d '{"connector.class":"org.apache.rocketmq.connect.hologres.connector.HologresSourceConnector","host":"localhost","port":"80","database":"test_db","username":"","password":"","table":"public.test_table","connect.topicname":"holoTopic","max.tasks":"2","slotName":"","startTime":"2023-05-31 12:00:00","value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter","key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"}' + +``` +POST http://${runtime-ip}:${runtime-port}/connectors/hologresSinkConnector +{ + "connector.class":"org.apache.rocketmq.connect.hologres.connector.HologresSinkConnector", + "host":"localhost", + "port":80, + "database":"test_db", + "table":"public.test_table", + "max.tasks":2, + "connect.topicname":"holoTopic", + "slotName":"", + "startTime":"2023-05-31 12:00:00", + "value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter", + "key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter" +} +``` + +##### parameter configuration + +| parameter | effect | required | default | +|----------------------------|------------------------------------------|----------|---------| +| slotName | The slotName of the binlog | yes | null | +| startTime | Where to start consume binlog | yes | null | +| binlogReadBatchSize | The read batch size of binlog reader | no | 1024 | +| binlogHeartBeatIntervalMs | The heart beat interval of binlog reader | no | -1 | +| binlogIgnoreDelete | Whether ignore Delete operation | no | false | +| binlogIgnoreBeforeUpdate | Whether ignore BeforeUpdate operation | no | false | +| retryCount | The max retry times of consume binlog | no | 3 | +| binlogCommitTimeIntervalMs | The commit interval of binlog | no | 5000 | + +**启动Sink Connector** +curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/hologresSinkConnector -d '{"connector.class":"org.apache.rocketmq.connect.hologres.connector.HologresSinkConnector","host":"localhost","port":"80","database":"test_db","username":"","password":"","table":"public.test_table","connect.topicnames":"holoTopic","max.tasks":"2","value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter","key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"}' + +``` +POST http://${runtime-ip}:${runtime-port}/connectors/hologresSinkConnector +{ + "connector.class":"org.apache.rocketmq.connect.hologres.connector.HologresSinkConnector", + "host":"localhost", + "port":80, + "database":"test_db", + "table":"public.test_table", + "max.tasks":2, + "connect.topicnames":"holoTopic", + "value.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter", + "key.converter":"org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter" +} +``` + +##### parameter configuration + +| parameter | effect | required | default | +|------------------|------------------------------------|----------|-------------------| +| dynamicPartition | Whether open dynamicPartition | no | false | +| writeMode | The write mode of the holo client | no | INSERT_OR_REPLACE | + +**查看Connector运行状态** + +http://127.0.0.1:8081/connectors/connector-name/status + +**查看Connector配置** + +http://127.0.0.1:8081/connectors/connector-name/config + +**关闭Connector** + +http://127.0.0.1:8081/connectors/connector-name/stop diff --git a/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/config/AbstractHologresConfig.java b/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/config/AbstractHologresConfig.java new file mode 100644 index 000000000..59d49192f --- /dev/null +++ b/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/config/AbstractHologresConfig.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.hologres.config; + +import io.openmessaging.KeyValue; + +import java.util.HashSet; +import java.util.Set; + +public class AbstractHologresConfig { + + public static final Set REQUEST_CONFIG = new HashSet(){ + { + add(HologresConstant.HOST); + add(HologresConstant.PORT); + add(HologresConstant.DATABASE); + } + }; + + private KeyValue keyValue; + private String host; + private int port; + private String username; + private String password; + private String database; + private String table; + + public AbstractHologresConfig(KeyValue keyValue) { + this.keyValue = keyValue; + ConfigUtils.load(keyValue, this); + } + + public KeyValue getKeyValue() { + return keyValue; + } + + public void setKeyValue(KeyValue keyValue) { + this.keyValue = keyValue; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public String getDatabase() { + return database; + } + + public void setDatabase(String database) { + this.database = database; + } + + public String getTable() { + return table; + } + + public void setTable(String table) { + this.table = table; + } + + public String getJdbcUrl() { + return String.format("%s%s:%d/%s", HologresConstant.URL_PREFIX, getHost(), getPort(), getDatabase()); + } +} diff --git a/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/config/ConfigUtils.java b/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/config/ConfigUtils.java new file mode 100644 index 000000000..aed52d8d5 --- /dev/null +++ b/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/config/ConfigUtils.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.hologres.config; + +import io.openmessaging.KeyValue; + +import java.lang.reflect.Method; + +public class ConfigUtils { + + public static void load(KeyValue keyValue, Object object) { + properties2Object(keyValue, object); + } + + private static void properties2Object(final KeyValue p, final Object object) { + Method[] methods = object.getClass().getMethods(); + for (Method method : methods) { + String mn = method.getName(); + if (mn.startsWith("set")) { + try { + String tmp = mn.substring(4); + String first = mn.substring(3, 4); + + String key = first.toLowerCase() + tmp; + String property = p.getString(key); + if (property != null) { + Class[] pt = method.getParameterTypes(); + if (pt != null && pt.length > 0) { + String cn = pt[0].getSimpleName(); + Object arg; + if (cn.equals("int") || cn.equals("Integer")) { + arg = Integer.parseInt(property); + } else if (cn.equals("long") || cn.equals("Long")) { + arg = Long.parseLong(property); + } else if (cn.equals("double") || cn.equals("Double")) { + arg = Double.parseDouble(property); + } else if (cn.equals("boolean") || cn.equals("Boolean")) { + arg = Boolean.parseBoolean(property); + } else if (cn.equals("float") || cn.equals("Float")) { + arg = Float.parseFloat(property); + } else if (cn.equals("String")) { + arg = property; + } else { + continue; + } + method.invoke(object, arg); + } + } + } catch (Throwable ignored) { + } + } + } + } +} diff --git a/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/config/HologresConstant.java b/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/config/HologresConstant.java new file mode 100644 index 000000000..f1abacff7 --- /dev/null +++ b/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/config/HologresConstant.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.hologres.config; + +public class HologresConstant { + + public static String TASK_NUM = "tasks.num"; + public static final String DRIVER_NAME = "com.aliyun.hologres.jdbc.HoloDriver"; + public static final String URL_PREFIX = "jdbc:hologres://"; + public static final String USERNAME = "username"; + public static final String PASSWORD = "password"; + public static final String HOST = "host"; + public static final String PORT = "port"; + public static final String DATABASE = "database"; + public static final String TABLES = "tables"; + public static final String TABLE_NAME = "tableName"; + + // source related + public static final String SOURCE_BATCH_SIZE = "binlogReadBatchSize"; + public static final int SOURCE_BATCH_SIZE_DEFAULT = 1024; + public static final String SOURCE_HEARTBEAT_INTERVAL = "binlogHeartBeatIntervalMs"; + public static final int SOURCE_HEARTBEAT_INTERVAL_DEFAULT = -1; + public static final String SOURCE_IGNORE_DELETE = "binlogIgnoreDelete"; + public static final String SOURCE_IGNORE_DELETE_DEFAULT = "false"; + public static final String SOURCE_IGNORE_BEFORE_UPDATE = "binlogIgnoreBeforeUpdate"; + public static final String SOURCE_IGNORE_BEFORE_UPDATE_DEFAULT = "false"; + public static final String SOURCE_RETRY_COUNT = "retryCount"; + public static final int SOURCE_RETRY_COUNT_DEFAULT = 3; + public static final String SOURCE_COMMIT_TIME_INTERVAL = "binlogCommitIntervalMs"; + public static final int SOURCE_COMMIT_TIME_INTERVAL_DEFAULT = 5000; + public static final String SOURCE_SLOT_NAME = "slotName"; + public static final String PARTITION_INFO_KEY = "partitionInfo"; + public static final String PARTITION_INDEX_KEY = "partitionIndex"; + public static final String HOLOGRES_POSITION = "HOLOGRES_POSITION"; + + // sink related + public static final String DYNAMIC_PARTITION = "dynamicPartition"; + public static final String DYNAMIC_PARTITION_DEFAULT = "false"; + public static final String WRITE_MODE = "writeMode"; + public static final String WRITE_MODE_DEFAULT = "INSERT_OR_REPLACE"; + + public static final String WRITE_BATCH = "writeBatch"; + public static final int WRITE_BATCH_DEFAULT = 512; + +} diff --git a/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/config/HologresSinkConfig.java b/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/config/HologresSinkConfig.java new file mode 100644 index 000000000..aecbc8341 --- /dev/null +++ b/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/config/HologresSinkConfig.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.hologres.config; + +import io.openmessaging.KeyValue; + +import static org.apache.rocketmq.connect.hologres.config.HologresConstant.DYNAMIC_PARTITION; +import static org.apache.rocketmq.connect.hologres.config.HologresConstant.DYNAMIC_PARTITION_DEFAULT; +import static org.apache.rocketmq.connect.hologres.config.HologresConstant.WRITE_MODE; +import static org.apache.rocketmq.connect.hologres.config.HologresConstant.WRITE_MODE_DEFAULT; + +public class HologresSinkConfig extends AbstractHologresConfig { + + private boolean dynamicPartition; + private String writeMode; + + public HologresSinkConfig(KeyValue keyValue) { + super(keyValue); + this.dynamicPartition = Boolean.parseBoolean(keyValue.getString(DYNAMIC_PARTITION, DYNAMIC_PARTITION_DEFAULT)); + this.writeMode = keyValue.getString(WRITE_MODE, WRITE_MODE_DEFAULT); + } + + public boolean isDynamicPartition() { + return dynamicPartition; + } + + public void setDynamicPartition(boolean dynamicPartition) { + this.dynamicPartition = dynamicPartition; + } + + public String getWriteMode() { + return writeMode; + } + + public void setWriteMode(String writeMode) { + this.writeMode = writeMode; + } +} diff --git a/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/config/HologresSourceConfig.java b/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/config/HologresSourceConfig.java new file mode 100644 index 000000000..4e6bd766d --- /dev/null +++ b/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/config/HologresSourceConfig.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.hologres.config; + +import io.openmessaging.KeyValue; + +import static org.apache.rocketmq.connect.hologres.config.HologresConstant.SOURCE_BATCH_SIZE; +import static org.apache.rocketmq.connect.hologres.config.HologresConstant.SOURCE_BATCH_SIZE_DEFAULT; +import static org.apache.rocketmq.connect.hologres.config.HologresConstant.SOURCE_COMMIT_TIME_INTERVAL; +import static org.apache.rocketmq.connect.hologres.config.HologresConstant.SOURCE_COMMIT_TIME_INTERVAL_DEFAULT; +import static org.apache.rocketmq.connect.hologres.config.HologresConstant.SOURCE_HEARTBEAT_INTERVAL; +import static org.apache.rocketmq.connect.hologres.config.HologresConstant.SOURCE_HEARTBEAT_INTERVAL_DEFAULT; +import static org.apache.rocketmq.connect.hologres.config.HologresConstant.SOURCE_IGNORE_BEFORE_UPDATE; +import static org.apache.rocketmq.connect.hologres.config.HologresConstant.SOURCE_IGNORE_BEFORE_UPDATE_DEFAULT; +import static org.apache.rocketmq.connect.hologres.config.HologresConstant.SOURCE_IGNORE_DELETE; +import static org.apache.rocketmq.connect.hologres.config.HologresConstant.SOURCE_IGNORE_DELETE_DEFAULT; +import static org.apache.rocketmq.connect.hologres.config.HologresConstant.SOURCE_RETRY_COUNT; +import static org.apache.rocketmq.connect.hologres.config.HologresConstant.SOURCE_RETRY_COUNT_DEFAULT; + +public class HologresSourceConfig extends AbstractHologresConfig { + + private int binlogReadBatchSize; + private int binlogHeartBeatIntervalMs; + private boolean binlogIgnoreDelete; + private boolean binlogIgnoreBeforeUpdate; + private int retryCount; + private int binlogCommitTimeIntervalMs; + + private String slotName; + // format: YYYY-mm-dd HH:mm:ss + private String startTime; + + public HologresSourceConfig(KeyValue keyValue) { + super(keyValue); + + this.binlogReadBatchSize = keyValue.getInt(SOURCE_BATCH_SIZE, SOURCE_BATCH_SIZE_DEFAULT); + this.binlogHeartBeatIntervalMs = keyValue.getInt(SOURCE_HEARTBEAT_INTERVAL, SOURCE_HEARTBEAT_INTERVAL_DEFAULT); + this.binlogIgnoreDelete = Boolean.parseBoolean(keyValue.getString(SOURCE_IGNORE_DELETE, SOURCE_IGNORE_DELETE_DEFAULT)); + this.binlogIgnoreBeforeUpdate = Boolean.parseBoolean(keyValue.getString(SOURCE_IGNORE_BEFORE_UPDATE, SOURCE_IGNORE_BEFORE_UPDATE_DEFAULT)); + this.retryCount = keyValue.getInt(SOURCE_RETRY_COUNT, SOURCE_RETRY_COUNT_DEFAULT); + this.binlogCommitTimeIntervalMs = keyValue.getInt(SOURCE_COMMIT_TIME_INTERVAL, SOURCE_COMMIT_TIME_INTERVAL_DEFAULT); + } + + public int getBinlogReadBatchSize() { + return binlogReadBatchSize; + } + + public void setBinlogReadBatchSize(int binlogReadBatchSize) { + this.binlogReadBatchSize = binlogReadBatchSize; + } + + public int getBinlogHeartBeatIntervalMs() { + return binlogHeartBeatIntervalMs; + } + + public void setBinlogHeartBeatIntervalMs(int binlogHeartBeatIntervalMs) { + this.binlogHeartBeatIntervalMs = binlogHeartBeatIntervalMs; + } + + public boolean isBinlogIgnoreDelete() { + return binlogIgnoreDelete; + } + + public void setBinlogIgnoreDelete(boolean binlogIgnoreDelete) { + this.binlogIgnoreDelete = binlogIgnoreDelete; + } + + public boolean isBinlogIgnoreBeforeUpdate() { + return binlogIgnoreBeforeUpdate; + } + + public void setBinlogIgnoreBeforeUpdate(boolean binlogIgnoreBeforeUpdate) { + this.binlogIgnoreBeforeUpdate = binlogIgnoreBeforeUpdate; + } + + public int getRetryCount() { + return retryCount; + } + + public void setRetryCount(int retryCount) { + this.retryCount = retryCount; + } + + public String getSlotName() { + return slotName; + } + + public void setSlotName(String slotName) { + this.slotName = slotName; + } + + public String getStartTime() { + return startTime; + } + + public void setStartTime(String startTime) { + this.startTime = startTime; + } + + public int getBinlogCommitTimeIntervalMs() { + return binlogCommitTimeIntervalMs; + } + + public void setBinlogCommitTimeIntervalMs(int binlogCommitTimeIntervalMs) { + this.binlogCommitTimeIntervalMs = binlogCommitTimeIntervalMs; + } +} diff --git a/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/connector/HologresSinkConnector.java b/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/connector/HologresSinkConnector.java new file mode 100644 index 000000000..5c0cd09c1 --- /dev/null +++ b/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/connector/HologresSinkConnector.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.hologres.connector; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.component.task.Task; +import io.openmessaging.connector.api.component.task.sink.SinkConnector; +import org.apache.rocketmq.connect.hologres.config.HologresSinkConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +public class HologresSinkConnector extends SinkConnector { + private static final Logger log = LoggerFactory.getLogger(HologresSinkConnector.class); + + private KeyValue keyValue; + + @Override + public void validate(KeyValue config) { + for (String requestKey : HologresSinkConfig.REQUEST_CONFIG) { + if (!config.containsKey(requestKey)) { + throw new RuntimeException("Request config key not exist: " + requestKey); + } + } + } + + @Override + public List taskConfigs(int maxTasks) { + log.info("Init {} task config", maxTasks); + List configs = new ArrayList<>(); + for (int i = 0; i < maxTasks; i++) { + configs.add(this.keyValue); + } + return configs; + } + + @Override + public Class taskClass() { + return HologresSinkTask.class; + } + + @Override + public void start(KeyValue keyValue) { + this.keyValue = keyValue; + } + + @Override + public void stop() { + this.keyValue = null; + } +} diff --git a/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/connector/HologresSinkTask.java b/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/connector/HologresSinkTask.java new file mode 100644 index 000000000..ba6dd917e --- /dev/null +++ b/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/connector/HologresSinkTask.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.connect.hologres.connector; + +import com.alibaba.hologres.client.HoloClient; +import com.alibaba.hologres.client.HoloConfig; +import com.alibaba.hologres.client.Put; +import com.alibaba.hologres.client.exception.HoloClientException; +import com.alibaba.hologres.client.model.TableSchema; +import com.alibaba.hologres.client.model.WriteMode; +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.component.task.sink.SinkTask; +import io.openmessaging.connector.api.data.ConnectRecord; +import io.openmessaging.connector.api.data.Field; +import io.openmessaging.connector.api.data.Struct; +import io.openmessaging.connector.api.errors.ConnectException; +import io.openmessaging.connector.api.errors.RetriableException; +import org.apache.rocketmq.connect.hologres.config.HologresSinkConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class HologresSinkTask extends SinkTask { + private static final Logger log = LoggerFactory.getLogger(HologresSinkTask.class); + + private KeyValue keyValue; + private HologresSinkConfig sinkConfig; + private HoloConfig holoClientConfig; + private HoloClient holoClient; + private final Map tableSchemaCache = new HashMap<>(); + + @Override + public void put(List records) throws ConnectException { + if (records == null || records.isEmpty()) { + return; + } + + log.info("Received {} sink records.", records.size()); + + try { + if (!tableSchemaCache.containsKey(sinkConfig.getTable())) { + tableSchemaCache.put(sinkConfig.getTable(), holoClient.getTableSchema(sinkConfig.getTable())); + } + TableSchema tableSchema = tableSchemaCache.get(sinkConfig.getTable()); + // TODO: check record match table schema + log.info("get table schema {}", tableSchema); + + List puts = new ArrayList<>(); + for (ConnectRecord record : records) { + log.info("Convert record: {}", record); + Put put = new Put(tableSchema); + List fields = record.getSchema().getFields(); + Struct structData = (Struct) record.getData(); + fields.forEach(field -> put.setObject(field.getName(), structData.get(field))); + puts.add(put); + } + + holoClient.put(puts); + log.info("Put {} record to hologres success", puts.size()); + } catch (HoloClientException e) { + log.error("Put record to hologres failed", e); + throw new RetriableException(e); + } + } + + @Override + public void start(KeyValue keyValue) { + this.keyValue = keyValue; + this.sinkConfig = new HologresSinkConfig(keyValue); + this.holoClientConfig = buildHoloConfig(sinkConfig); + log.info("Initializing hologres client"); + try { + this.holoClient = new HoloClient(holoClientConfig); + this.holoClient.setAsyncCommit(false); + } catch (HoloClientException e) { + log.error("Init hologres client failed", e); + throw new RuntimeException(e); + } + log.info("Hologres client started."); + } + + private HoloConfig buildHoloConfig(HologresSinkConfig sinkConfig) { + HoloConfig holoConfig = new HoloConfig(); + holoConfig.setJdbcUrl(sinkConfig.getJdbcUrl()); + holoConfig.setUsername(sinkConfig.getUsername()); + holoConfig.setPassword(sinkConfig.getPassword()); + // TODO: support more configuration + holoConfig.setDynamicPartition(sinkConfig.isDynamicPartition()); + holoConfig.setWriteMode(WriteMode.valueOf(sinkConfig.getWriteMode())); + return holoConfig; + } + + @Override + public void stop() { + log.info("Stopping hologres client"); + this.holoClient.close(); + } +} diff --git a/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/connector/HologresSourceConnector.java b/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/connector/HologresSourceConnector.java new file mode 100644 index 000000000..eb7291114 --- /dev/null +++ b/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/connector/HologresSourceConnector.java @@ -0,0 +1,53 @@ +package org.apache.rocketmq.connect.hologres.connector; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.component.task.Task; +import io.openmessaging.connector.api.component.task.source.SourceConnector; +import org.apache.rocketmq.connect.hologres.config.HologresSinkConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +public class HologresSourceConnector extends SourceConnector { + private static final Logger log = LoggerFactory.getLogger(HologresSourceConnector.class); + + private KeyValue keyValue; + + @Override + public void validate(KeyValue config) { + for (String requestKey : HologresSinkConfig.REQUEST_CONFIG) { + if (!config.containsKey(requestKey)) { + throw new RuntimeException("Request config key not exist: " + requestKey); + } + } + } + + @Override + public List taskConfigs(int maxTasks) { + log.info("Init {} source task config", maxTasks); + List configs = new ArrayList<>(); + for (int i = 0; i < maxTasks; i++) { + configs.add(this.keyValue); + } + return configs; + } + + @Override + public Class taskClass() { + return HologresSourceTask.class; + } + + @Override + public void start(KeyValue keyValue) { + log.info("HologresSourceConnector start enter"); + this.keyValue = keyValue; + } + + @Override + public void stop() { + log.info("HologresSourceConnector start enter"); + this.keyValue = null; + } +} diff --git a/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/connector/HologresSourceTask.java b/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/connector/HologresSourceTask.java new file mode 100644 index 000000000..bdcaa3c69 --- /dev/null +++ b/connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/connector/HologresSourceTask.java @@ -0,0 +1,168 @@ +package org.apache.rocketmq.connect.hologres.connector; + +import com.alibaba.hologres.client.BinlogShardGroupReader; +import com.alibaba.hologres.client.HoloClient; +import com.alibaba.hologres.client.HoloConfig; +import com.alibaba.hologres.client.Subscribe; +import com.alibaba.hologres.client.exception.HoloClientException; +import com.alibaba.hologres.client.model.Column; +import com.alibaba.hologres.client.model.binlog.BinlogHeartBeatRecord; +import com.alibaba.hologres.client.model.binlog.BinlogRecord; +import com.alibaba.hologres.com.google.common.base.Strings; +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.component.task.source.SourceTask; +import io.openmessaging.connector.api.data.*; +import org.apache.rocketmq.connect.hologres.config.HologresSourceConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Timestamp; +import java.util.*; + +import static org.apache.rocketmq.connect.hologres.config.HologresConstant.*; + +public class HologresSourceTask extends SourceTask { + private static final Logger log = LoggerFactory.getLogger(HologresSourceTask.class); + + private KeyValue keyValue; + private HologresSourceConfig sourceConfig; + private HoloConfig holoClientConfig; + private HoloClient holoClient; + private BinlogShardGroupReader reader; + private long count = 0; + + @Override + public List poll() throws InterruptedException { + List records = new ArrayList<>(); + try { + BinlogRecord record = reader.getBinlogRecord(); + + if (record instanceof BinlogHeartBeatRecord) { + return null; + } + + if (++count % 1000 == 0) { + reader.commit(sourceConfig.getBinlogCommitTimeIntervalMs()); + } + + records.add(hologresRecord2ConnectRecord(record)); + } catch (Exception e) { + log.error("Error while polling data from Hologres", e); + } + return records; + } + + private ConnectRecord hologresRecord2ConnectRecord(BinlogRecord record) { + List fields = buildFields(record); + Schema schema = SchemaBuilder.struct() + .name(record.getTableName().getTableName()) + .build(); + schema.setFields(fields); + return new ConnectRecord( + buildRecordPartition(record), + buildRecordOffset(record), + System.currentTimeMillis(), + schema, + buildPayLoad(fields, schema, record)); + } + + private RecordPartition buildRecordPartition(BinlogRecord record) { + Map partitionMap = new HashMap<>(); + partitionMap.put(PARTITION_INFO_KEY, record.getSchema().getPartitionInfo()); + partitionMap.put(PARTITION_INDEX_KEY, String.valueOf(record.getSchema().getPartitionIndex())); + return new RecordPartition(partitionMap); + } + + private RecordOffset buildRecordOffset(BinlogRecord record) { + Map offsetMap = new HashMap<>(); + RecordOffset recordOffset = new RecordOffset(offsetMap); + offsetMap.put(record.getTableName() + ":" + HOLOGRES_POSITION, record.getBinlogLsn()); + return recordOffset; + } + + private List buildFields(BinlogRecord record) { + List fields = new ArrayList<>(); + final Column[] columns = record.getSchema().getColumnSchema(); + + for (int i = 0; i < columns.length; ++i) { + fields.add(new Field(i, columns[i].getName(), getSchema(record.getObject(i)))); + } + + return fields; + } + + public Schema getSchema(Object obj) { + if (obj instanceof Integer) { + return SchemaBuilder.int32().build(); + } else if (obj instanceof Long) { + return SchemaBuilder.int64().build(); + } else if (obj instanceof String) { + return SchemaBuilder.string().build(); + } else if (obj instanceof Date) { + return SchemaBuilder.time().build(); + } else if (obj instanceof Timestamp) { + return SchemaBuilder.timestamp().build(); + } else if (obj instanceof Boolean) { + return SchemaBuilder.bool().build(); + } + return SchemaBuilder.string().build(); + } + + private Struct buildPayLoad(List fields, Schema schema, BinlogRecord record) { + Struct payLoad = new Struct(schema); + for (int i = 0; i < fields.size(); ++i) { + payLoad.put(fields.get(i), record.getValues()[i]); + } + return payLoad; + } + + @Override + public void start(KeyValue keyValue) { + this.keyValue = keyValue; + this.sourceConfig = new HologresSourceConfig(keyValue); + this.holoClientConfig = buildHoloConfig(sourceConfig); + checkConfigValidate(); + + log.info("Initializing hologres client and binlog subscribe"); + try { + this.holoClient = new HoloClient(holoClientConfig); + this.reader = holoClient.binlogSubscribe(Subscribe + .newStartTimeBuilder(sourceConfig.getTable(), sourceConfig.getSlotName()) + .setBinlogReadStartTime(sourceConfig.getStartTime()) + .build()); + } catch (HoloClientException e) { + log.error("Init hologres client failed", e); + throw new RuntimeException(e); + } + } + + private void checkConfigValidate() { + if (Strings.isNullOrEmpty(sourceConfig.getSlotName())) { + throw new RuntimeException("Hologres source connector must set slot name."); + } + + if (Strings.isNullOrEmpty(sourceConfig.getStartTime())) { + throw new RuntimeException("Hologres source connector must set start time."); + } + } + + private HoloConfig buildHoloConfig(HologresSourceConfig sourceConfig) { + HoloConfig holoConfig = new HoloConfig(); + holoConfig.setJdbcUrl(sourceConfig.getJdbcUrl()); + holoConfig.setUsername(sourceConfig.getUsername()); + holoConfig.setPassword(sourceConfig.getPassword()); + + // binlog optional configuration + holoConfig.setBinlogReadBatchSize(sourceConfig.getBinlogReadBatchSize()); + holoConfig.setBinlogIgnoreDelete(sourceConfig.isBinlogIgnoreDelete()); + holoConfig.setBinlogIgnoreBeforeUpdate(sourceConfig.isBinlogIgnoreBeforeUpdate()); + holoConfig.setBinlogHeartBeatIntervalMs(sourceConfig.getBinlogHeartBeatIntervalMs()); + holoConfig.setRetryCount(sourceConfig.getRetryCount()); + + return holoConfig; + } + + public void stop() { + log.info("Stopping HologresSourceTask"); + } +} diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java index 496e02767..23e5d3ed3 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java @@ -388,7 +388,7 @@ public synchronized void startConnectors(Map connectorC @Override public void onCompletion(Throwable error, TargetState result) { if (error != null) { - log.error(error.getMessage()); + log.error("Start connector failed", error); } else { log.info("Start connector {} and set target state {} successed!!", connectorName, result); } diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java index 79317bdb9..8e518be3b 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java @@ -250,7 +250,7 @@ private Boolean sendRecord() throws InterruptedException { SendCallback callback = new SendCallback() { @Override public void onSuccess(SendResult result) { - log.info("Successful send message to RocketMQ:{}, Topic {}", result.getMsgId(), result.getMessageQueue().getTopic()); + log.info("Successful send message to RocketMQ, MsgId:{}, Topic {}", result.getMsgId(), result.getMessageQueue().getTopic()); // complete record counter.completeRecord(); // commit record for custom diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/record/json/JsonConverter.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/record/json/JsonConverter.java index d9e825947..cf45ad22d 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/record/json/JsonConverter.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/record/json/JsonConverter.java @@ -313,7 +313,7 @@ public byte[] fromConnectData(String topic, Schema schema, Object value) { try { return serializer.serialize(topic, jsonValue); } catch (Exception e) { - throw new ConnectException("Converting Kafka Connect data to byte[] failed due to serialization error: ", e); + throw new ConnectException("Converting RocketMQ Connect data to byte[] failed due to serialization error: ", e); } } @@ -334,7 +334,7 @@ public SchemaAndValue toConnectData(String topic, byte[] value) { try { jsonValue = deserializer.deserialize(topic, value); } catch (Exception e) { - throw new ConnectException("Converting byte[] to Kafka Connect data failed due to serialization error: ", e); + throw new ConnectException("Converting byte[] to RocketMQ Connect data failed due to serialization error: ", e); } JSONObject newJsonValue; if (!converterConfig.schemasEnabled()) { @@ -500,8 +500,8 @@ public JSONObject asJsonSchema(Schema schema) { } /** - * Convert this object, in the org.apache.kafka.connect.data format, into a JSON object, returning both the schema - * and the converted object. + * Convert this object, in the io.openmessaging.connector.api.data.ConnectRecord format, + * into a JSON object, returning both the schema and the converted object. */ private Object convertToJson(Schema schema, Object value) { if (value == null) {