Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[rocketmq-replicator] Support create producer with connector's params… #463

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,23 @@
*/
package org.apache.rocketmq.replicator;

import static org.apache.rocketmq.connect.runtime.config.ConnectorConfig.CONNECTOR_ID;
import static org.apache.rocketmq.connect.runtime.config.ConnectorConfig.ERRORS_TOLERANCE_CONFIG;
import static org.apache.rocketmq.connect.runtime.config.SourceConnectorConfig.CONNECT_TOPICNAME;
import com.alibaba.fastjson.JSON;
import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.component.task.Task;
import io.openmessaging.connector.api.component.task.source.SourceConnector;
import io.openmessaging.connector.api.errors.ConnectException;
import io.openmessaging.internal.DefaultKeyValue;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
Expand All @@ -32,20 +43,16 @@
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.replicator.config.*;
import org.apache.rocketmq.connect.runtime.config.ConnectorConfig;
import org.apache.rocketmq.connect.runtime.errors.ToleranceType;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.replicator.config.ConsumeFromWhere;
import org.apache.rocketmq.replicator.config.ReplicatorConnectorConfig;
import org.apache.rocketmq.replicator.exception.GetMetaDataException;
import org.apache.rocketmq.replicator.exception.InitMQClientException;
import org.apache.rocketmq.replicator.utils.ReplicatorUtils;
import org.apache.rocketmq.connect.runtime.errors.ToleranceType;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;

import java.util.*;

import static org.apache.rocketmq.connect.runtime.config.ConnectorConfig.CONNECTOR_ID;
import static org.apache.rocketmq.connect.runtime.config.ConnectorConfig.ERRORS_TOLERANCE_CONFIG;
import static org.apache.rocketmq.connect.runtime.config.SourceConnectorConfig.CONNECT_TOPICNAME;

/**
* @author osgoo
* @date 2022/6/16
Expand Down Expand Up @@ -176,11 +183,12 @@ public List<KeyValue> taskConfigs(int maxTasks) {
if (null != connectorConfig.getString(ReplicatorConnectorConfig.DEST_INSTANCEID)) {
keyValue.put(ReplicatorConnectorConfig.DEST_INSTANCEID, connectorConfig.getString(ReplicatorConnectorConfig.DEST_INSTANCEID));
}
keyValue.put(ReplicatorConnectorConfig.DEST_ENDPOINT, connectorConfig.getString(ReplicatorConnectorConfig.DEST_ENDPOINT));
keyValue.put(ConnectorConfig.RMQ_NAMESRVADDR, connectorConfig.getString(ReplicatorConnectorConfig.DEST_ENDPOINT));
keyValue.put(ReplicatorConnectorConfig.DEST_TOPIC, connectorConfig.getString(ReplicatorConnectorConfig.DEST_TOPIC));
keyValue.put(ReplicatorConnectorConfig.DEST_ACL_ENABLE, connectorConfig.getString(ReplicatorConnectorConfig.DEST_ACL_ENABLE, "false"));
keyValue.put(ReplicatorConnectorConfig.DEST_ACCESS_KEY, connectorConfig.getString(ReplicatorConnectorConfig.DEST_ACCESS_KEY, ""));
keyValue.put(ReplicatorConnectorConfig.DEST_SECRET_KEY, connectorConfig.getString(ReplicatorConnectorConfig.DEST_SECRET_KEY, ""));
keyValue.put(ConnectorConfig.RMQ_ACL_ENABLE, connectorConfig.getString(ReplicatorConnectorConfig.DEST_ACL_ENABLE, "false"));
keyValue.put(ConnectorConfig.RMQ_ACCESS_KEY, connectorConfig.getString(ReplicatorConnectorConfig.DEST_ACCESS_KEY, ""));
keyValue.put(ConnectorConfig.RMQ_SECRET_KEY, connectorConfig.getString(ReplicatorConnectorConfig.DEST_SECRET_KEY, ""));
keyValue.put(ConnectorConfig.USE_NAMESRV_OF_CONNECTOR, "true");

keyValue.put(ReplicatorConnectorConfig.SYNC_TPS, connectorConfig.getInt(ReplicatorConnectorConfig.SYNC_TPS, ReplicatorConnectorConfig.DEFAULT_SYNC_TPS));

Expand Down Expand Up @@ -208,7 +216,7 @@ public Class<? extends Task> taskClass() {
return ReplicatorSourceTask.class;
}

private Set<String> neededParamKeys = new HashSet<String>() {
private static Set<String> neededParamKeys = new HashSet<String>() {
{
add(ReplicatorConnectorConfig.SRC_CLOUD);
add(ReplicatorConnectorConfig.SRC_REGION);
Expand All @@ -220,7 +228,6 @@ public Class<? extends Task> taskClass() {
add(ReplicatorConnectorConfig.DEST_CLUSTER);
add(ReplicatorConnectorConfig.DEST_ENDPOINT);
add(ReplicatorConnectorConfig.DEST_TOPIC);
add(ReplicatorConnectorConfig.SRC_CLOUD);
add(ReplicatorConnectorConfig.SRC_ACL_ENABLE);
add(ReplicatorConnectorConfig.DEST_ACL_ENABLE);
add(ERRORS_TOLERANCE_CONFIG);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,39 @@
*/
package org.apache.rocketmq.replicator;

import static org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerSinkTask.QUEUE_OFFSET;
import static org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerSinkTask.TOPIC;
import com.alibaba.fastjson.JSON;
import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.component.task.source.SourceTask;
import io.openmessaging.connector.api.data.*;
import io.openmessaging.connector.api.data.ConnectRecord;
import io.openmessaging.connector.api.data.RecordOffset;
import io.openmessaging.connector.api.data.RecordPartition;
import io.openmessaging.connector.api.data.Schema;
import io.openmessaging.connector.api.data.SchemaBuilder;
import io.openmessaging.internal.DefaultKeyValue;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
Expand Down Expand Up @@ -58,21 +86,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

import static org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerSinkTask.QUEUE_OFFSET;
import static org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerSinkTask.TOPIC;

/**
* @author osgoo
* @date 2022/6/16
*/
public class ReplicatorSourceTask extends SourceTask {
private static final Logger log = LoggerFactory.getLogger(ReplicatorSourceTask.class);
Expand Down Expand Up @@ -189,6 +206,7 @@ private void buildMqAdminClient() throws MQClientException {
if (srcMQAdminExt != null) {
srcMQAdminExt.shutdown();
}
// use /home/admin/onskey white ak as default
RPCHook rpcHook = null;
if (connectorConfig.isSrcAclEnable()) {
if (StringUtils.isNotEmpty(connectorConfig.getSrcAccessKey()) && StringUtils.isNotEmpty(connectorConfig.getSrcSecretKey())) {
Expand Down Expand Up @@ -699,9 +717,9 @@ public void start(KeyValue config) {
connectorConfig.setDestInstanceId(config.getString(ReplicatorConnectorConfig.DEST_INSTANCEID));
connectorConfig.setDestEndpoint(config.getString(ReplicatorConnectorConfig.DEST_ENDPOINT));
connectorConfig.setDestTopic(config.getString(ReplicatorConnectorConfig.DEST_TOPIC));
connectorConfig.setDestAclEnable(Boolean.valueOf(config.getString(ReplicatorConnectorConfig.DEST_ACL_ENABLE, "true")));
connectorConfig.setSrcAclEnable(Boolean.valueOf(config.getString(ReplicatorConnectorConfig.SRC_ACL_ENABLE, "true")));
connectorConfig.setAutoCreateInnerConsumergroup(Boolean.valueOf(config.getString(ReplicatorConnectorConfig.AUTO_CREATE_INNER_CONSUMERGROUP, "false")));
connectorConfig.setDestAclEnable(Boolean.parseBoolean(config.getString(ReplicatorConnectorConfig.DEST_ACL_ENABLE, "true")));
connectorConfig.setSrcAclEnable(Boolean.parseBoolean(config.getString(ReplicatorConnectorConfig.SRC_ACL_ENABLE, "true")));
connectorConfig.setAutoCreateInnerConsumergroup(Boolean.parseBoolean(config.getString(ReplicatorConnectorConfig.AUTO_CREATE_INNER_CONSUMERGROUP, "false")));

connectorConfig.setSyncTps(config.getInt(ReplicatorConnectorConfig.SYNC_TPS));
connectorConfig.setDividedNormalQueues(config.getString(ReplicatorConnectorConfig.DIVIDED_NORMAL_QUEUES));
Expand All @@ -710,7 +728,7 @@ public void start(KeyValue config) {

connectorConfig.setConsumeFromWhere(config.getString(ReplicatorConnectorConfig.CONSUME_FROM_WHERE, ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET.name()));
if (connectorConfig.getConsumeFromWhere() == ConsumeFromWhere.CONSUME_FROM_TIMESTAMP) {
connectorConfig.setConsumeFromTimestamp(Long.valueOf(config.getString(ReplicatorConnectorConfig.CONSUME_FROM_TIMESTAMP)));
connectorConfig.setConsumeFromTimestamp(Long.parseLong(config.getString(ReplicatorConnectorConfig.CONSUME_FROM_TIMESTAMP)));
}
log.info("ReplicatorSourceTask connectorConfig : " + connectorConfig);

Expand All @@ -731,8 +749,7 @@ public void start(KeyValue config) {
buildConsumer();
log.info("buildConsumer finished.");
// init limiter
int limit = connectorConfig.getSyncTps();
tpsLimit = limit;
tpsLimit = connectorConfig.getSyncTps();
log.info("RateLimiter init finished.");
// subscribe topic & start consumer
subscribeTopicAndStartConsumer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,16 @@ public class ConnectorConfig {
public static final int ERRORS_RETRY_MAX_DELAY_DEFAULT = 60000;
public static final String ERRORS_TOLERANCE_CONFIG = "errors.tolerance";
public static final ToleranceType ERRORS_TOLERANCE_DEFAULT = ToleranceType.NONE;
/**
* Use connector's namesrv address to create producer in `SourceTask`.
*/
public final static String USE_NAMESRV_OF_CONNECTOR = "use.namesrv.of.connector";
public static final String RMQ_ACL_ENABLE = "rmq.acl.enable";
public static final String RMQ_ACCESS_KEY = "rmq.access.key";
public static final String RMQ_SECRET_KEY = "rmq.secret_key";
public static final String RMQ_NAMESRVADDR = "rmq.namesrvaddr";
public static final String RMQ_PRODUCER_GROUP = "rmq.producer.group";
public static final String RMQ_OPERATION_TIMEOUT = "rmq.operation.timeout";
/**
* The required key for all configurations.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.rocketmq.connect.runtime.connectorwrapper;

import static org.apache.rocketmq.connect.runtime.connectorwrapper.status.AbstractStatus.State.PAUSED;
import static org.apache.rocketmq.connect.runtime.connectorwrapper.status.AbstractStatus.State.RUNNING;
import com.alibaba.fastjson.JSON;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.internal.ConcurrentSet;
Expand Down Expand Up @@ -78,9 +80,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.rocketmq.connect.runtime.connectorwrapper.status.AbstractStatus.State.PAUSED;
import static org.apache.rocketmq.connect.runtime.connectorwrapper.status.AbstractStatus.State.RUNNING;

/**
* A worker to schedule all connectors and tasks in a process.
*/
Expand Down Expand Up @@ -870,7 +869,7 @@ private void startTask(Map<String, List<ConnectKeyValue>> newTasks) throws Excep
}

if (task instanceof SourceTask) {
DefaultMQProducer producer = ConnectUtil.initDefaultMQProducer(workerConfig);
DefaultMQProducer producer = ConnectUtil.initDefaultMQProducer(workerConfig, keyValue);
TransformChain<ConnectRecord> transformChain = new TransformChain<>(keyValue, plugin);
// create retry operator
RetryWithToleranceOperator retryWithToleranceOperator = ReporterManagerUtil.createRetryWithToleranceOperator(keyValue, errorMetricsGroup);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,22 @@

package org.apache.rocketmq.connect.runtime.utils;

import static org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerSinkTask.QUEUE_OFFSET;
import com.beust.jcommander.internal.Sets;
import com.google.common.collect.Maps;
import io.openmessaging.connector.api.data.RecordOffset;
import io.openmessaging.connector.api.data.RecordPartition;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
Expand Down Expand Up @@ -55,20 +67,6 @@
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import static org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerSinkTask.QUEUE_OFFSET;

public class ConnectUtil {

public static final String SYS_TASK_CG_PREFIX = "connect-";
Expand Down Expand Up @@ -111,21 +109,49 @@ public static AllocateConnAndTaskStrategy initAllocateConnAndTaskStrategy(Worker
}
}

public static DefaultMQProducer initDefaultMQProducer(WorkerConfig connectConfig) {
public static DefaultMQProducer initDefaultMQProducer(WorkerConfig connectConfig, ConnectKeyValue keyValue) {
boolean aclEnable;
String accessKey;
String secretKey;
String namesrvAddr;
String producerGroup;
int sendTimeout;

if (keyValue != null
&& Boolean.parseBoolean(keyValue.getString(ConnectorConfig.USE_NAMESRV_OF_CONNECTOR, "false"))) {
aclEnable = Boolean.parseBoolean(keyValue.getString(ConnectorConfig.RMQ_ACL_ENABLE, "false"));
accessKey = keyValue.getString(ConnectorConfig.RMQ_ACCESS_KEY);
secretKey = keyValue.getString(ConnectorConfig.RMQ_SECRET_KEY);
namesrvAddr = keyValue.getString(ConnectorConfig.RMQ_NAMESRVADDR);
producerGroup = keyValue.getString(ConnectorConfig.RMQ_PRODUCER_GROUP, connectConfig.getRmqProducerGroup());
sendTimeout = keyValue.getInt(ConnectorConfig.RMQ_OPERATION_TIMEOUT, connectConfig.getOperationTimeout());
} else {
aclEnable = connectConfig.isAclEnable();
accessKey = connectConfig.getAccessKey();
secretKey = connectConfig.getSecretKey();
namesrvAddr = connectConfig.getNamesrvAddr();
producerGroup = connectConfig.getRmqProducerGroup();
sendTimeout = connectConfig.getOperationTimeout();
}

RPCHook rpcHook = null;
if (connectConfig.getAclEnable()) {
rpcHook = new AclClientRPCHook(new SessionCredentials(connectConfig.getAccessKey(), connectConfig.getSecretKey()));
if (aclEnable) {
rpcHook = new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
}
DefaultMQProducer producer = new DefaultMQProducer(rpcHook);
producer.setNamesrvAddr(connectConfig.getNamesrvAddr());
producer.setInstanceName(createUniqInstance(connectConfig.getNamesrvAddr()));
producer.setProducerGroup(connectConfig.getRmqProducerGroup());
producer.setSendMsgTimeout(connectConfig.getOperationTimeout());
producer.setNamesrvAddr(namesrvAddr);
producer.setInstanceName(createUniqInstance(namesrvAddr));
producer.setProducerGroup(producerGroup);
producer.setSendMsgTimeout(sendTimeout);
producer.setMaxMessageSize(ConnectorConfig.MAX_MESSAGE_SIZE);
producer.setLanguage(LanguageCode.JAVA);
return producer;
}

public static DefaultMQProducer initDefaultMQProducer(WorkerConfig connectConfig) {
return initDefaultMQProducer(connectConfig, null);
}

public static DefaultMQPullConsumer initDefaultMQPullConsumer(WorkerConfig connectConfig) {
RPCHook rpcHook = null;
if (connectConfig.getAclEnable()) {
Expand Down