From 18dda9b489fe68891eb1c8a8b9307c0f7b6edb0b Mon Sep 17 00:00:00 2001 From: liuchangqing Date: Thu, 6 Apr 2023 16:15:55 +0800 Subject: [PATCH] [rocketmq-replicator] Support create producer with connector's params in source task. --- .../replicator/ReplicatorSourceConnector.java | 37 ++++++---- .../replicator/ReplicatorSourceTask.java | 60 +++++++++------- .../runtime/config/ConnectorConfig.java | 10 +++ .../runtime/connectorwrapper/Worker.java | 7 +- .../connect/runtime/utils/ConnectUtil.java | 68 +++++++++++++------ 5 files changed, 119 insertions(+), 63 deletions(-) diff --git a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java index efd34e1ae..c90f41689 100644 --- a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java +++ b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java @@ -16,12 +16,23 @@ */ package org.apache.rocketmq.replicator; +import static org.apache.rocketmq.connect.runtime.config.ConnectorConfig.CONNECTOR_ID; +import static org.apache.rocketmq.connect.runtime.config.ConnectorConfig.ERRORS_TOLERANCE_CONFIG; +import static org.apache.rocketmq.connect.runtime.config.SourceConnectorConfig.CONNECT_TOPICNAME; import com.alibaba.fastjson.JSON; import io.openmessaging.KeyValue; import io.openmessaging.connector.api.component.task.Task; import io.openmessaging.connector.api.component.task.source.SourceConnector; import io.openmessaging.connector.api.errors.ConnectException; import io.openmessaging.internal.DefaultKeyValue; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; @@ -32,20 +43,16 @@ import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; -import org.apache.rocketmq.replicator.config.*; +import org.apache.rocketmq.connect.runtime.config.ConnectorConfig; +import org.apache.rocketmq.connect.runtime.errors.ToleranceType; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.replicator.config.ConsumeFromWhere; +import org.apache.rocketmq.replicator.config.ReplicatorConnectorConfig; import org.apache.rocketmq.replicator.exception.GetMetaDataException; import org.apache.rocketmq.replicator.exception.InitMQClientException; import org.apache.rocketmq.replicator.utils.ReplicatorUtils; -import org.apache.rocketmq.connect.runtime.errors.ToleranceType; -import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; -import java.util.*; - -import static org.apache.rocketmq.connect.runtime.config.ConnectorConfig.CONNECTOR_ID; -import static org.apache.rocketmq.connect.runtime.config.ConnectorConfig.ERRORS_TOLERANCE_CONFIG; -import static org.apache.rocketmq.connect.runtime.config.SourceConnectorConfig.CONNECT_TOPICNAME; - /** * @author osgoo * @date 2022/6/16 @@ -176,11 +183,12 @@ public List taskConfigs(int maxTasks) { if (null != connectorConfig.getString(ReplicatorConnectorConfig.DEST_INSTANCEID)) { keyValue.put(ReplicatorConnectorConfig.DEST_INSTANCEID, connectorConfig.getString(ReplicatorConnectorConfig.DEST_INSTANCEID)); } - keyValue.put(ReplicatorConnectorConfig.DEST_ENDPOINT, connectorConfig.getString(ReplicatorConnectorConfig.DEST_ENDPOINT)); + keyValue.put(ConnectorConfig.RMQ_NAMESRVADDR, connectorConfig.getString(ReplicatorConnectorConfig.DEST_ENDPOINT)); keyValue.put(ReplicatorConnectorConfig.DEST_TOPIC, connectorConfig.getString(ReplicatorConnectorConfig.DEST_TOPIC)); - keyValue.put(ReplicatorConnectorConfig.DEST_ACL_ENABLE, connectorConfig.getString(ReplicatorConnectorConfig.DEST_ACL_ENABLE, "false")); - keyValue.put(ReplicatorConnectorConfig.DEST_ACCESS_KEY, connectorConfig.getString(ReplicatorConnectorConfig.DEST_ACCESS_KEY, "")); - keyValue.put(ReplicatorConnectorConfig.DEST_SECRET_KEY, connectorConfig.getString(ReplicatorConnectorConfig.DEST_SECRET_KEY, "")); + keyValue.put(ConnectorConfig.RMQ_ACL_ENABLE, connectorConfig.getString(ReplicatorConnectorConfig.DEST_ACL_ENABLE, "false")); + keyValue.put(ConnectorConfig.RMQ_ACCESS_KEY, connectorConfig.getString(ReplicatorConnectorConfig.DEST_ACCESS_KEY, "")); + keyValue.put(ConnectorConfig.RMQ_SECRET_KEY, connectorConfig.getString(ReplicatorConnectorConfig.DEST_SECRET_KEY, "")); + keyValue.put(ConnectorConfig.USE_NAMESRV_OF_CONNECTOR, "true"); keyValue.put(ReplicatorConnectorConfig.SYNC_TPS, connectorConfig.getInt(ReplicatorConnectorConfig.SYNC_TPS, ReplicatorConnectorConfig.DEFAULT_SYNC_TPS)); @@ -208,7 +216,7 @@ public Class taskClass() { return ReplicatorSourceTask.class; } - private Set neededParamKeys = new HashSet() { + private static Set neededParamKeys = new HashSet() { { add(ReplicatorConnectorConfig.SRC_CLOUD); add(ReplicatorConnectorConfig.SRC_REGION); @@ -220,7 +228,6 @@ public Class taskClass() { add(ReplicatorConnectorConfig.DEST_CLUSTER); add(ReplicatorConnectorConfig.DEST_ENDPOINT); add(ReplicatorConnectorConfig.DEST_TOPIC); - add(ReplicatorConnectorConfig.SRC_CLOUD); add(ReplicatorConnectorConfig.SRC_ACL_ENABLE); add(ReplicatorConnectorConfig.DEST_ACL_ENABLE); add(ERRORS_TOLERANCE_CONFIG); diff --git a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java index 5da50d98a..0290f47c4 100644 --- a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java +++ b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceTask.java @@ -16,11 +16,39 @@ */ package org.apache.rocketmq.replicator; +import static org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerSinkTask.QUEUE_OFFSET; +import static org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerSinkTask.TOPIC; import com.alibaba.fastjson.JSON; import io.openmessaging.KeyValue; import io.openmessaging.connector.api.component.task.source.SourceTask; -import io.openmessaging.connector.api.data.*; +import io.openmessaging.connector.api.data.ConnectRecord; +import io.openmessaging.connector.api.data.RecordOffset; +import io.openmessaging.connector.api.data.RecordPartition; +import io.openmessaging.connector.api.data.Schema; +import io.openmessaging.connector.api.data.SchemaBuilder; import io.openmessaging.internal.DefaultKeyValue; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.TreeMap; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.BiConsumer; +import java.util.function.Consumer; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.acl.common.AclClientRPCHook; @@ -58,19 +86,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.charset.StandardCharsets; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.function.BiConsumer; -import java.util.function.Consumer; - -import static org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerSinkTask.QUEUE_OFFSET; -import static org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerSinkTask.TOPIC; - /** * @author osgoo @@ -505,7 +520,7 @@ private ConnectRecord convertToSinkDataEntry(MessageExt message) { ConnectRecord sinkDataEntry = null; String connectTimestamp = properties.get(ConnectorConfig.CONNECT_TIMESTAMP); - timestamp = StringUtils.isNotEmpty(connectTimestamp) ? Long.valueOf(connectTimestamp) : System.currentTimeMillis(); + timestamp = StringUtils.isNotEmpty(connectTimestamp) ? Long.parseLong(connectTimestamp) : System.currentTimeMillis(); // String connectSchema = properties.get(ConnectorConfig.CONNECT_SCHEMA); // schema = StringUtils.isNotEmpty(connectSchema) ? JSON.parseObject(connectSchema, Schema.class) : null; Schema schema = SchemaBuilder.string().build(); @@ -663,10 +678,10 @@ public void commit(ConnectRecord record, Map metadata) { Map map = record.getPosition().getPartition().getPartition(); String brokerName = (String) map.get("brokerName"); String topic = (String) map.get("topic"); - int queueId = Integer.valueOf((String) map.get("queueId")); + int queueId = Integer.parseInt((String) map.get("queueId")); MessageQueue mq = new MessageQueue(topic, brokerName, queueId); Map offsetMap = record.getPosition().getOffset().getOffset(); - long offset = Long.valueOf((String) offsetMap.get(QUEUE_OFFSET)); + long offset = Long.parseLong((String) offsetMap.get(QUEUE_OFFSET)); long canCommitOffset = removeMessage(mq, offset); commitOffset(mq, canCommitOffset); } catch (Exception e) { @@ -702,9 +717,9 @@ public void start(KeyValue config) { connectorConfig.setDestInstanceId(config.getString(ReplicatorConnectorConfig.DEST_INSTANCEID)); connectorConfig.setDestEndpoint(config.getString(ReplicatorConnectorConfig.DEST_ENDPOINT)); connectorConfig.setDestTopic(config.getString(ReplicatorConnectorConfig.DEST_TOPIC)); - connectorConfig.setDestAclEnable(Boolean.valueOf(config.getString(ReplicatorConnectorConfig.DEST_ACL_ENABLE, "true"))); - connectorConfig.setSrcAclEnable(Boolean.valueOf(config.getString(ReplicatorConnectorConfig.SRC_ACL_ENABLE, "true"))); - connectorConfig.setAutoCreateInnerConsumergroup(Boolean.valueOf(config.getString(ReplicatorConnectorConfig.AUTO_CREATE_INNER_CONSUMERGROUP, "false"))); + connectorConfig.setDestAclEnable(Boolean.parseBoolean(config.getString(ReplicatorConnectorConfig.DEST_ACL_ENABLE, "true"))); + connectorConfig.setSrcAclEnable(Boolean.parseBoolean(config.getString(ReplicatorConnectorConfig.SRC_ACL_ENABLE, "true"))); + connectorConfig.setAutoCreateInnerConsumergroup(Boolean.parseBoolean(config.getString(ReplicatorConnectorConfig.AUTO_CREATE_INNER_CONSUMERGROUP, "false"))); connectorConfig.setSyncTps(config.getInt(ReplicatorConnectorConfig.SYNC_TPS)); connectorConfig.setDividedNormalQueues(config.getString(ReplicatorConnectorConfig.DIVIDED_NORMAL_QUEUES)); @@ -713,7 +728,7 @@ public void start(KeyValue config) { connectorConfig.setConsumeFromWhere(config.getString(ReplicatorConnectorConfig.CONSUME_FROM_WHERE, ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET.name())); if (connectorConfig.getConsumeFromWhere() == ConsumeFromWhere.CONSUME_FROM_TIMESTAMP) { - connectorConfig.setConsumeFromTimestamp(Long.valueOf(config.getString(ReplicatorConnectorConfig.CONSUME_FROM_TIMESTAMP))); + connectorConfig.setConsumeFromTimestamp(Long.parseLong(config.getString(ReplicatorConnectorConfig.CONSUME_FROM_TIMESTAMP))); } log.info("ReplicatorSourceTask connectorConfig : " + connectorConfig); @@ -734,8 +749,7 @@ public void start(KeyValue config) { buildConsumer(); log.info("buildConsumer finished."); // init limiter - int limit = connectorConfig.getSyncTps(); - tpsLimit = limit; + tpsLimit = connectorConfig.getSyncTps(); log.info("RateLimiter init finished."); // subscribe topic & start consumer subscribeTopicAndStartConsumer(); diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/ConnectorConfig.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/ConnectorConfig.java index ea8d5394a..76cddae12 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/ConnectorConfig.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/ConnectorConfig.java @@ -67,6 +67,16 @@ public class ConnectorConfig { public static final int ERRORS_RETRY_MAX_DELAY_DEFAULT = 60000; public static final String ERRORS_TOLERANCE_CONFIG = "errors.tolerance"; public static final ToleranceType ERRORS_TOLERANCE_DEFAULT = ToleranceType.NONE; + /** + * Use connector's namesrv address to create producer in `SourceTask`. + */ + public final static String USE_NAMESRV_OF_CONNECTOR = "use.namesrv.of.connector"; + public static final String RMQ_ACL_ENABLE = "rmq.acl.enable"; + public static final String RMQ_ACCESS_KEY = "rmq.access.key"; + public static final String RMQ_SECRET_KEY = "rmq.secret_key"; + public static final String RMQ_NAMESRVADDR = "rmq.namesrvaddr"; + public static final String RMQ_PRODUCER_GROUP = "rmq.producer.group"; + public static final String RMQ_OPERATION_TIMEOUT = "rmq.operation.timeout"; /** * The required key for all configurations. */ diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java index f6ff52cf7..5ba245f2a 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java @@ -17,6 +17,8 @@ package org.apache.rocketmq.connect.runtime.connectorwrapper; +import static org.apache.rocketmq.connect.runtime.connectorwrapper.status.AbstractStatus.State.PAUSED; +import static org.apache.rocketmq.connect.runtime.connectorwrapper.status.AbstractStatus.State.RUNNING; import com.alibaba.fastjson.JSON; import io.netty.util.concurrent.DefaultThreadFactory; import io.netty.util.internal.ConcurrentSet; @@ -78,9 +80,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.rocketmq.connect.runtime.connectorwrapper.status.AbstractStatus.State.PAUSED; -import static org.apache.rocketmq.connect.runtime.connectorwrapper.status.AbstractStatus.State.RUNNING; - /** * A worker to schedule all connectors and tasks in a process. */ @@ -870,7 +869,7 @@ private void startTask(Map> newTasks) throws Excep } if (task instanceof SourceTask) { - DefaultMQProducer producer = ConnectUtil.initDefaultMQProducer(workerConfig); + DefaultMQProducer producer = ConnectUtil.initDefaultMQProducer(workerConfig, keyValue); TransformChain transformChain = new TransformChain<>(keyValue, plugin); // create retry operator RetryWithToleranceOperator retryWithToleranceOperator = ReporterManagerUtil.createRetryWithToleranceOperator(keyValue, errorMetricsGroup); diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java index d65849a36..2a2048671 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java @@ -17,10 +17,22 @@ package org.apache.rocketmq.connect.runtime.utils; +import static org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerSinkTask.QUEUE_OFFSET; import com.beust.jcommander.internal.Sets; import com.google.common.collect.Maps; import io.openmessaging.connector.api.data.RecordOffset; import io.openmessaging.connector.api.data.RecordPartition; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; @@ -55,20 +67,6 @@ import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.command.CommandUtil; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.Collectors; - -import static org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerSinkTask.QUEUE_OFFSET; - public class ConnectUtil { public static final String SYS_TASK_CG_PREFIX = "connect-"; @@ -111,21 +109,49 @@ public static AllocateConnAndTaskStrategy initAllocateConnAndTaskStrategy(Worker } } - public static DefaultMQProducer initDefaultMQProducer(WorkerConfig connectConfig) { + public static DefaultMQProducer initDefaultMQProducer(WorkerConfig connectConfig, ConnectKeyValue keyValue) { + boolean aclEnable; + String accessKey; + String secretKey; + String namesrvAddr; + String producerGroup; + int sendTimeout; + + if (keyValue != null + && Boolean.parseBoolean(keyValue.getString(ConnectorConfig.USE_NAMESRV_OF_CONNECTOR, "false"))) { + aclEnable = Boolean.parseBoolean(keyValue.getString(ConnectorConfig.RMQ_ACL_ENABLE, "false")); + accessKey = keyValue.getString(ConnectorConfig.RMQ_ACCESS_KEY); + secretKey = keyValue.getString(ConnectorConfig.RMQ_SECRET_KEY); + namesrvAddr = keyValue.getString(ConnectorConfig.RMQ_NAMESRVADDR); + producerGroup = keyValue.getString(ConnectorConfig.RMQ_PRODUCER_GROUP, connectConfig.getRmqProducerGroup()); + sendTimeout = keyValue.getInt(ConnectorConfig.RMQ_OPERATION_TIMEOUT, connectConfig.getOperationTimeout()); + } else { + aclEnable = connectConfig.isAclEnable(); + accessKey = connectConfig.getAccessKey(); + secretKey = connectConfig.getSecretKey(); + namesrvAddr = connectConfig.getNamesrvAddr(); + producerGroup = connectConfig.getRmqProducerGroup(); + sendTimeout = connectConfig.getOperationTimeout(); + } + RPCHook rpcHook = null; - if (connectConfig.getAclEnable()) { - rpcHook = new AclClientRPCHook(new SessionCredentials(connectConfig.getAccessKey(), connectConfig.getSecretKey())); + if (aclEnable) { + rpcHook = new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)); } DefaultMQProducer producer = new DefaultMQProducer(rpcHook); - producer.setNamesrvAddr(connectConfig.getNamesrvAddr()); - producer.setInstanceName(createUniqInstance(connectConfig.getNamesrvAddr())); - producer.setProducerGroup(connectConfig.getRmqProducerGroup()); - producer.setSendMsgTimeout(connectConfig.getOperationTimeout()); + producer.setNamesrvAddr(namesrvAddr); + producer.setInstanceName(createUniqInstance(namesrvAddr)); + producer.setProducerGroup(producerGroup); + producer.setSendMsgTimeout(sendTimeout); producer.setMaxMessageSize(ConnectorConfig.MAX_MESSAGE_SIZE); producer.setLanguage(LanguageCode.JAVA); return producer; } + public static DefaultMQProducer initDefaultMQProducer(WorkerConfig connectConfig) { + return initDefaultMQProducer(connectConfig, null); + } + public static DefaultMQPullConsumer initDefaultMQPullConsumer(WorkerConfig connectConfig) { RPCHook rpcHook = null; if (connectConfig.getAclEnable()) {