diff --git a/connectors/rocketmq-connect-debezium/pom.xml b/connectors/rocketmq-connect-debezium/pom.xml
index eabe0d83..affc5632 100644
--- a/connectors/rocketmq-connect-debezium/pom.xml
+++ b/connectors/rocketmq-connect-debezium/pom.xml
@@ -326,6 +326,11 @@
logback-core
1.2.9
+
+ org.apache.rocketmq
+ rocketmq-connect-common
+ 0.0.1-SNAPSHOT
+
diff --git a/metric-exporter/pom.xml b/metric-exporter/pom.xml
index ca5a5646..ab06e014 100644
--- a/metric-exporter/pom.xml
+++ b/metric-exporter/pom.xml
@@ -29,6 +29,7 @@
commons-lang3
3.12.0
+
org.apache.rocketmq
rocketmq-client
@@ -37,6 +38,10 @@
org.apache.rocketmq
rocketmq-tools
+
+ org.apache.rocketmq
+ rocketmq-connect-common
+
diff --git a/metric-exporter/src/main/java/org/apache/rocketmq/connect/metrics/reporter/RocketMQClientUtil.java b/metric-exporter/src/main/java/org/apache/rocketmq/connect/metrics/reporter/RocketMQClientUtil.java
deleted file mode 100644
index 114bd396..00000000
--- a/metric-exporter/src/main/java/org/apache/rocketmq/connect/metrics/reporter/RocketMQClientUtil.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.connect.metrics.reporter;
-
-import com.google.common.collect.Sets;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import org.apache.rocketmq.acl.common.AclClientRPCHook;
-import org.apache.rocketmq.acl.common.SessionCredentials;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.remoting.RPCHook;
-import org.apache.rocketmq.remoting.protocol.LanguageCode;
-import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
-import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper;
-import org.apache.rocketmq.remoting.protocol.route.BrokerData;
-import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
-import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
-import org.apache.rocketmq.tools.command.CommandUtil;
-
-
-/**
- * rocket connect util
- */
-public class RocketMQClientUtil {
-
- public static String createUniqInstance(String prefix) {
- return new StringBuffer(prefix).append("-").append(UUID.randomUUID()).toString();
- }
-
- private static RPCHook getAclRPCHook(String accessKey, String secretKey) {
- return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
- }
-
-
- public static DefaultMQProducer initDefaultMQProducer(boolean aclEnabled,
- String accessKey,
- String secretKey,
- String groupId,
- String namesrvAddr) {
- RPCHook rpcHook = null;
- if (aclEnabled) {
- rpcHook = getAclRPCHook(accessKey, secretKey);
- }
- DefaultMQProducer producer = new DefaultMQProducer(rpcHook);
- producer.setNamesrvAddr(namesrvAddr);
- producer.setInstanceName(createUniqInstance(namesrvAddr));
- producer.setProducerGroup(groupId);
- producer.setSendMsgTimeout(5000);
- producer.setLanguage(LanguageCode.JAVA);
- return producer;
- }
-
- public static DefaultMQAdminExt startMQAdminTool(boolean aclEnabled,
- String accessKey,
- String secretKey,
- String groupId,
- String namesrvAddr
- ) throws MQClientException {
- DefaultMQAdminExt admin;
- if (aclEnabled) {
- admin = new DefaultMQAdminExt(new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)));
- } else {
- admin = new DefaultMQAdminExt();
- }
- admin.setNamesrvAddr(namesrvAddr);
- admin.setAdminExtGroup(groupId);
- admin.setInstanceName(createUniqInstance(namesrvAddr));
- admin.start();
- return admin;
- }
-
-
- public static void createTopic(DefaultMQAdminExt defaultMQAdminExt,
- TopicConfig topicConfig) {
- try {
- ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
- Map> clusterAddrTable = clusterInfo.getClusterAddrTable();
- Set clusterNameSet = clusterAddrTable.keySet();
- for (String clusterName : clusterNameSet) {
- Set masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
- for (String addr : masterSet) {
- defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
- }
- }
- } catch (Exception e) {
- throw new RuntimeException("create topic: " + topicConfig.getTopicName() + " failed", e);
- }
- }
-
- public static boolean topicExist(DefaultMQAdminExt defaultMQAdminExt, String topic) {
- boolean foundTopicRouteInfo = false;
- try {
- TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
- if (topicRouteData != null) {
- foundTopicRouteInfo = true;
- }
- } catch (Exception e) {
- foundTopicRouteInfo = false;
- }
- return foundTopicRouteInfo;
- }
-
- public static Set fetchAllConsumerGroup(DefaultMQAdminExt defaultMQAdminExt) {
- Set consumerGroupSet = Sets.newHashSet();
- try {
- ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
- for (BrokerData brokerData : clusterInfo.getBrokerAddrTable().values()) {
- SubscriptionGroupWrapper subscriptionGroupWrapper = defaultMQAdminExt.getAllSubscriptionGroup(brokerData.selectBrokerAddr(), 3000L);
- consumerGroupSet.addAll(subscriptionGroupWrapper.getSubscriptionGroupTable().keySet());
- }
- } catch (Exception e) {
- e.printStackTrace();
- throw new RuntimeException("fetch all topic failed", e);
- }
- return consumerGroupSet;
- }
-
- public static String createSubGroup(DefaultMQAdminExt defaultMQAdminExt, String subGroup) {
- try {
- SubscriptionGroupConfig initConfig = new SubscriptionGroupConfig();
- initConfig.setGroupName(subGroup);
- ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
- Map> clusterAddrTable = clusterInfo.getClusterAddrTable();
- Set clusterNameSet = clusterAddrTable.keySet();
- for (String clusterName : clusterNameSet) {
- Set masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
- for (String addr : masterSet) {
- defaultMQAdminExt.createAndUpdateSubscriptionGroupConfig(addr, initConfig);
- }
- }
- } catch (Exception e) {
- throw new RuntimeException("create subGroup: " + subGroup + " failed", e);
- }
- return subGroup;
- }
-}
-
-
diff --git a/metric-exporter/src/main/java/org/apache/rocketmq/connect/metrics/reporter/RocketMQScheduledReporter.java b/metric-exporter/src/main/java/org/apache/rocketmq/connect/metrics/reporter/RocketMQScheduledReporter.java
index 29229d99..f6fe3a7d 100644
--- a/metric-exporter/src/main/java/org/apache/rocketmq/connect/metrics/reporter/RocketMQScheduledReporter.java
+++ b/metric-exporter/src/main/java/org/apache/rocketmq/connect/metrics/reporter/RocketMQScheduledReporter.java
@@ -25,9 +25,11 @@
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.connect.common.ProducerConfiguration;
+import org.apache.rocketmq.connect.common.RocketMqBaseConfiguration;
+import org.apache.rocketmq.connect.common.RocketMqUtils;
import org.apache.rocketmq.connect.metrics.MetricName;
import org.apache.rocketmq.connect.metrics.ScheduledMetricsReporter;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -126,27 +128,35 @@ public void config(Map configs) {
}
this.topic = configs.get(METRICS_TOPIC);
String groupId = configs.get(GROUP_ID);
- DefaultMQAdminExt defaultMQAdminExt = null;
try {
- defaultMQAdminExt = RocketMQClientUtil.startMQAdminTool(Boolean.valueOf(configs.get(ACL_ENABLED)), configs.get(ACCESS_KEY), configs.get(SECRET_KEY), groupId, configs.get(NAMESRV_ADDR));
- if (!RocketMQClientUtil.topicExist(defaultMQAdminExt, topic)) {
- RocketMQClientUtil.createTopic(defaultMQAdminExt, new TopicConfig(topic));
+ RocketMqBaseConfiguration baseConfiguration = RocketMqBaseConfiguration
+ .builder()
+ .namesrvAddr(configs.get(NAMESRV_ADDR))
+ .aclEnable(Boolean.valueOf(configs.get(ACL_ENABLED)))
+ .accessKey(configs.get(ACCESS_KEY))
+ .secretKey(configs.get(SECRET_KEY))
+ .groupId(groupId)
+ .build();
+
+ RocketMqUtils.maybeCreateTopic(baseConfiguration, new TopicConfig(topic));
+ if (!RocketMqUtils.fetchAllConsumerGroup(baseConfiguration).contains(groupId)) {
+ RocketMqUtils.createGroup(baseConfiguration, groupId);
}
- if (!RocketMQClientUtil.fetchAllConsumerGroup(defaultMQAdminExt).contains(groupId)) {
- RocketMQClientUtil.createSubGroup(defaultMQAdminExt, groupId);
- }
- this.producer = RocketMQClientUtil.initDefaultMQProducer(Boolean.valueOf(configs.get(ACL_ENABLED)), configs.get(ACCESS_KEY), configs.get(SECRET_KEY), groupId, configs.get(NAMESRV_ADDR));
+ ProducerConfiguration producerConfiguration = ProducerConfiguration
+ .producerBuilder()
+ .namesrvAddr(configs.get(NAMESRV_ADDR))
+ .aclEnable(Boolean.valueOf(configs.get(ACL_ENABLED)))
+ .accessKey(configs.get(ACCESS_KEY))
+ .secretKey(configs.get(SECRET_KEY))
+ .groupId(groupId)
+ .build();
+ this.producer = RocketMqUtils.initDefaultMQProducer(producerConfiguration);
this.producer.start();
} catch (Exception e) {
log.error("Init config failed ", e);
- } finally {
- if (defaultMQAdminExt != null) {
- defaultMQAdminExt.shutdown();
- }
}
}
-
@Override
public void start() {
this.start(10, TimeUnit.SECONDS);
diff --git a/pom.xml b/pom.xml
index 3a91a0a0..cc37e510 100644
--- a/pom.xml
+++ b/pom.xml
@@ -23,6 +23,7 @@
pom
0.0.1-SNAPSHOT
+ rocketmq-connect-common
rocketmq-connect-sample
rocketmq-connect-runtime
rocketmq-connect-cli
@@ -57,6 +58,9 @@
4.4
5.1.0
+
+ 1.18.0
+
3.8.1
UTF-8
@@ -150,6 +154,11 @@
guava
${guava.version}
+
+ org.apache.rocketmq
+ rocketmq-connect-common
+ ${project.version}
+
org.apache.rocketmq
rocketmq-connect-cli
@@ -181,21 +190,16 @@
commons-collections4
${commons-collections4.version}
-
- org.apache.rocketmq
- rocketmq-tools
- ${rocketmq.version}
-
-
- org.apache.rocketmq
- rocketmq-client
- ${rocketmq.version}
-
org.apache.maven
maven-artifact
${maven-artifact.version}
+
+ org.projectlombok
+ lombok
+ ${lombok.version}
+
diff --git a/rocketmq-connect-common/pom.xml b/rocketmq-connect-common/pom.xml
new file mode 100644
index 00000000..18511f37
--- /dev/null
+++ b/rocketmq-connect-common/pom.xml
@@ -0,0 +1,34 @@
+
+
+
+ rocketmq-connect
+ org.apache.rocketmq
+ 0.0.1-SNAPSHOT
+
+ 4.0.0
+
+ rocketmq-connect-common
+
+
+ 8
+ 8
+ UTF-8
+
+
+
+ org.apache.rocketmq
+ rocketmq-client
+
+
+ org.apache.rocketmq
+ rocketmq-tools
+
+
+ org.projectlombok
+ lombok
+
+
+
+
\ No newline at end of file
diff --git a/rocketmq-connect-common/src/main/java/org/apache/rocketmq/connect/common/ConsumerConfiguration.java b/rocketmq-connect-common/src/main/java/org/apache/rocketmq/connect/common/ConsumerConfiguration.java
new file mode 100644
index 00000000..d559c515
--- /dev/null
+++ b/rocketmq-connect-common/src/main/java/org/apache/rocketmq/connect/common/ConsumerConfiguration.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.common;
+
+import lombok.Builder;
+import lombok.Getter;
+import lombok.Setter;
+
+@Getter
+@Setter
+public class ConsumerConfiguration extends RocketMqBaseConfiguration {
+ // consumer
+ private Integer batchSize;
+ private Long pollTimeoutMillis;
+
+
+ @Builder(builderMethodName = "consumerBuilder")
+ public ConsumerConfiguration(String namesrvAddr, String groupId, boolean aclEnable, String accessKey,
+ String secretKey,
+ Integer batchSize, Long pollTimeoutMillis) {
+ super(namesrvAddr, groupId, aclEnable, accessKey, secretKey);
+ this.batchSize = batchSize;
+ this.pollTimeoutMillis = pollTimeoutMillis;
+ }
+}
diff --git a/rocketmq-connect-common/src/main/java/org/apache/rocketmq/connect/common/ProducerConfiguration.java b/rocketmq-connect-common/src/main/java/org/apache/rocketmq/connect/common/ProducerConfiguration.java
new file mode 100644
index 00000000..23d8b88a
--- /dev/null
+++ b/rocketmq-connect-common/src/main/java/org/apache/rocketmq/connect/common/ProducerConfiguration.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.common;
+
+import lombok.Builder;
+import lombok.Getter;
+import lombok.Setter;
+
+@Getter
+@Setter
+public class ProducerConfiguration extends RocketMqBaseConfiguration {
+ // producer
+ private Integer maxMessageSize;
+ private Integer sendMsgTimeout;
+
+ @Builder(builderMethodName = "producerBuilder")
+ public ProducerConfiguration(String namesrvAddr, String groupId, boolean aclEnable, String accessKey,
+ String secretKey,
+ Integer maxMessageSize, Integer sendMsgTimeout) {
+ super(namesrvAddr, groupId, aclEnable, accessKey, secretKey);
+ this.maxMessageSize = maxMessageSize;
+ this.sendMsgTimeout = sendMsgTimeout;
+ }
+}
diff --git a/rocketmq-connect-common/src/main/java/org/apache/rocketmq/connect/common/RocketMqBaseConfiguration.java b/rocketmq-connect-common/src/main/java/org/apache/rocketmq/connect/common/RocketMqBaseConfiguration.java
new file mode 100644
index 00000000..ac29e36c
--- /dev/null
+++ b/rocketmq-connect-common/src/main/java/org/apache/rocketmq/connect/common/RocketMqBaseConfiguration.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.common;
+
+import lombok.Builder;
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ * rocketmq base configuration
+ */
+@Getter
+@Setter
+@Builder
+public class RocketMqBaseConfiguration {
+ private String namesrvAddr;
+ private String groupId;
+ /**
+ * set acl config
+ */
+ private boolean aclEnable;
+ private String accessKey;
+ private String secretKey;
+}
diff --git a/rocketmq-connect-common/src/main/java/org/apache/rocketmq/connect/common/RocketMqUtils.java b/rocketmq-connect-common/src/main/java/org/apache/rocketmq/connect/common/RocketMqUtils.java
new file mode 100644
index 00000000..63c60fc5
--- /dev/null
+++ b/rocketmq-connect-common/src/main/java/org/apache/rocketmq/connect/common/RocketMqUtils.java
@@ -0,0 +1,533 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.common;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.KeyBuilder;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.connect.common.constant.LoggerName;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
+import org.apache.rocketmq.remoting.protocol.admin.OffsetWrapper;
+import org.apache.rocketmq.remoting.protocol.admin.TopicOffset;
+import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
+import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
+import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper;
+import org.apache.rocketmq.remoting.protocol.route.BrokerData;
+import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+
+/**
+ * RocketMq utils
+ */
+public class RocketMqUtils {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
+
+ public static String createUniqInstance(String prefix) {
+ return prefix.concat("-").concat(UUID.randomUUID().toString());
+ }
+
+ /**
+ * Init default mq producer
+ *
+ * @param configuration
+ * @return
+ */
+ public static DefaultMQProducer initDefaultMQProducer(ProducerConfiguration configuration) {
+ RPCHook rpcHook = null;
+ if (configuration.isAclEnable()) {
+ rpcHook = new AclClientRPCHook(new SessionCredentials(configuration.getAccessKey(), configuration.getSecretKey()));
+ }
+ DefaultMQProducer producer = new DefaultMQProducer(rpcHook);
+ producer.setNamesrvAddr(configuration.getNamesrvAddr());
+ producer.setInstanceName(createUniqInstance(configuration.getNamesrvAddr()));
+ producer.setProducerGroup(configuration.getGroupId());
+ if (configuration.getSendMsgTimeout() != null) {
+ producer.setSendMsgTimeout(configuration.getSendMsgTimeout());
+ }
+ if (configuration.getMaxMessageSize() != null) {
+ producer.setMaxMessageSize(configuration.getMaxMessageSize());
+ }
+ producer.setLanguage(LanguageCode.JAVA);
+ return producer;
+ }
+
+ /**
+ * init default lite pull consumer
+ *
+ * @param configuration
+ * @return
+ * @throws MQClientException
+ */
+ public static DefaultLitePullConsumer initDefaultLitePullConsumer(ConsumerConfiguration configuration,
+ boolean autoCommit) {
+ RPCHook rpcHook = null;
+ if (configuration.isAclEnable()) {
+ rpcHook = new AclClientRPCHook(new SessionCredentials(configuration.getAccessKey(), configuration.getSecretKey()));
+ }
+ DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(rpcHook);
+ consumer.setNamesrvAddr(configuration.getNamesrvAddr());
+ String uniqueName = Thread.currentThread().getName() + "-" + System.currentTimeMillis() % 1000;
+ consumer.setInstanceName(uniqueName);
+ consumer.setUnitName(uniqueName);
+ consumer.setAutoCommit(autoCommit);
+ if (StringUtils.isNotEmpty(configuration.getGroupId())) {
+ consumer.setConsumerGroup(configuration.getGroupId());
+ }
+ consumer.setLanguage(LanguageCode.JAVA);
+ return consumer;
+ }
+
+ /**
+ * Created when the topic is not exist
+ *
+ * @param configuration
+ * @param config
+ */
+ public static void maybeCreateTopic(RocketMqBaseConfiguration configuration, TopicConfig config) {
+ log.info("Try to create topic: {}!", config.getTopicName());
+ DefaultMQAdminExt defaultMQAdminExt = null;
+ try {
+ defaultMQAdminExt = startMQAdminTool(configuration);
+ if (existTopicRoute(defaultMQAdminExt, config.getTopicName())) {
+ log.info("Topic [{}] exist!", config.getTopicName());
+ // topic exist
+ return;
+ }
+ ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
+ Map> clusterAddrTable = clusterInfo.getClusterAddrTable();
+ Set clusterNameSet = clusterAddrTable.keySet();
+ for (String clusterName : clusterNameSet) {
+ Set masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
+ for (String addr : masterSet) {
+ defaultMQAdminExt.createAndUpdateTopicConfig(addr, config);
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Create topic [" + config.getTopicName() + "] failed", e);
+ } finally {
+ if (defaultMQAdminExt != null) {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+ }
+
+ /**
+ * create topic
+ *
+ * @param configuration
+ * @param config
+ */
+ public static void createTopic(RocketMqBaseConfiguration configuration, TopicConfig config) {
+ DefaultMQAdminExt defaultMQAdminExt = null;
+ try {
+ defaultMQAdminExt = startMQAdminTool(configuration);
+ ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
+ Map> clusterAddrTable = clusterInfo.getClusterAddrTable();
+ Set clusterNameSet = clusterAddrTable.keySet();
+ for (String clusterName : clusterNameSet) {
+ Set masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
+ for (String addr : masterSet) {
+ defaultMQAdminExt.createAndUpdateTopicConfig(addr, config);
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Create topic [" + config.getTopicName() + "] failed", e);
+ } finally {
+ if (defaultMQAdminExt != null) {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+ }
+
+ public static Set fetchAllConsumerGroup(RocketMqBaseConfiguration configuration) {
+ Set consumerGroupSet = Sets.newHashSet();
+ DefaultMQAdminExt defaultMQAdminExt = null;
+ try {
+ defaultMQAdminExt = startMQAdminTool(configuration);
+ ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
+ for (BrokerData brokerData : clusterInfo.getBrokerAddrTable().values()) {
+ SubscriptionGroupWrapper subscriptionGroupWrapper = defaultMQAdminExt.getAllSubscriptionGroup(brokerData.selectBrokerAddr(), 3000L);
+ consumerGroupSet.addAll(subscriptionGroupWrapper.getSubscriptionGroupTable().keySet());
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Fetch all topic failed", e);
+ } finally {
+ if (defaultMQAdminExt != null) {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+ return consumerGroupSet;
+ }
+
+ public static String createGroup(RocketMqBaseConfiguration configuration, String group) {
+ DefaultMQAdminExt defaultMQAdminExt = null;
+ try {
+ defaultMQAdminExt = startMQAdminTool(configuration);
+ SubscriptionGroupConfig initConfig = new SubscriptionGroupConfig();
+ initConfig.setGroupName(group);
+ ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
+ Map> clusterAddrTable = clusterInfo.getClusterAddrTable();
+ Set clusterNameSet = clusterAddrTable.keySet();
+ for (String clusterName : clusterNameSet) {
+ Set masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
+ for (String addr : masterSet) {
+ defaultMQAdminExt.createAndUpdateSubscriptionGroupConfig(addr, initConfig);
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Create group [" + group + "] failed", e);
+ } finally {
+ if (defaultMQAdminExt != null) {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+ return group;
+ }
+
+ /**
+ * Get topic offsets
+ */
+ public static Map> offsetTopics(
+ RocketMqBaseConfiguration config, List topics) {
+ Map> offsets = Maps.newHashMap();
+ DefaultMQAdminExt adminClient = null;
+ try {
+ adminClient = startMQAdminTool(config);
+ for (String topic : topics) {
+ TopicStatsTable topicStatsTable = examineTopicStats(adminClient, topic);
+ offsets.put(topic, topicStatsTable.getOffsetTable());
+ }
+ return offsets;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ if (adminClient != null) {
+ adminClient.shutdown();
+ }
+ }
+ }
+
+ /**
+ * Flat topics offsets
+ */
+ public static Map flatOffsetTopics(
+ RocketMqBaseConfiguration config, List topics) {
+ Map messageQueueTopicOffsets = Maps.newHashMap();
+ offsetTopics(config, topics).values()
+ .forEach(
+ offsetTopic -> {
+ messageQueueTopicOffsets.putAll(offsetTopic);
+ });
+ return messageQueueTopicOffsets;
+ }
+
+ /**
+ * Search offsets by timestamp
+ */
+ public static Map searchOffsetsByTimestamp(
+ RocketMqBaseConfiguration config,
+ Collection messageQueues,
+ Long timestamp) {
+ Map offsets = Maps.newHashMap();
+ DefaultMQAdminExt adminClient = null;
+ try {
+ adminClient = startMQAdminTool(config);
+ for (MessageQueue messageQueue : messageQueues) {
+ long offset = adminClient.searchOffset(messageQueue, timestamp);
+ offsets.put(messageQueue, offset);
+ }
+ return offsets;
+ } catch (MQClientException e) {
+ throw new RuntimeException(e);
+ } finally {
+ if (adminClient != null) {
+ adminClient.shutdown();
+ }
+ }
+ }
+
+ /**
+ * Get consumer group offset
+ */
+ public static Map currentOffsets(RocketMqBaseConfiguration config, String groupName,
+ List topics, Set messageQueues) {
+ // Get consumer group offset
+ DefaultMQAdminExt adminClient = null;
+ try {
+ adminClient = startMQAdminTool(config);
+ Map consumerOffsets = Maps.newHashMap();
+ for (String topic : topics) {
+ ConsumeStats consumeStats = examineConsumeStats(adminClient, groupName, topic);
+ consumerOffsets.putAll(consumeStats.getOffsetTable());
+ }
+ return consumerOffsets.keySet().stream()
+ .filter(messageQueue -> messageQueues.contains(messageQueue))
+ .collect(
+ Collectors.toMap(
+ messageQueue -> messageQueue,
+ messageQueue ->
+ consumerOffsets.get(messageQueue).getConsumerOffset()));
+ } catch (Exception e) {
+ if (e instanceof MQClientException) {
+ if (((MQClientException) e).getResponseCode() == ResponseCode.TOPIC_NOT_EXIST) {
+ return Collections.emptyMap();
+ } else {
+ throw new RuntimeException(e);
+ }
+ } else {
+ throw new RuntimeException(e);
+ }
+ } finally {
+ if (adminClient != null) {
+ adminClient.shutdown();
+ }
+ }
+ }
+
+ private static DefaultMQAdminExt startMQAdminTool(
+ RocketMqBaseConfiguration configuration) throws MQClientException {
+ RPCHook rpcHook = null;
+ if (configuration.isAclEnable()) {
+ rpcHook = new AclClientRPCHook(new SessionCredentials(configuration.getAccessKey(), configuration.getSecretKey()));
+ }
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+ defaultMQAdminExt.setNamesrvAddr(configuration.getNamesrvAddr());
+ defaultMQAdminExt.setAdminExtGroup(configuration.getGroupId());
+ defaultMQAdminExt.setInstanceName(createUniqInstance(configuration.getNamesrvAddr()));
+ defaultMQAdminExt.start();
+ return defaultMQAdminExt;
+ }
+
+ public static boolean isTopicExist(RocketMqBaseConfiguration connectConfig, String topic) {
+ DefaultMQAdminExt defaultMQAdminExt = null;
+ try {
+ defaultMQAdminExt = startMQAdminTool(connectConfig);
+ return existTopicRoute(defaultMQAdminExt, topic);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ if (defaultMQAdminExt != null) {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+ }
+
+ private static boolean existTopicRoute(DefaultMQAdminExt defaultMQAdminExt, String topic) {
+ boolean foundTopicRouteInfo = false;
+ try {
+ TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
+ if (topicRouteData != null) {
+ foundTopicRouteInfo = true;
+ }
+ } catch (Exception e) {
+ if (e instanceof MQClientException) {
+ if (((MQClientException) e).getResponseCode() == ResponseCode.TOPIC_NOT_EXIST) {
+ foundTopicRouteInfo = false;
+ } else {
+ throw new RuntimeException("Get topic route info failed", e);
+ }
+ } else {
+ throw new RuntimeException("Get topic route info failed", e);
+ }
+ }
+ return foundTopicRouteInfo;
+ }
+
+ /**
+ * Compatible with 4.9.4 and earlier
+ *
+ * @param adminClient
+ * @param topic
+ * @return
+ */
+ private static TopicStatsTable examineTopicStats(DefaultMQAdminExt adminClient, String topic) {
+ try {
+ return adminClient.examineTopicStats(topic);
+ } catch (MQBrokerException e) {
+ // Compatible with 4.9.4 and earlier
+ if (e.getResponseCode() == ResponseCode.REQUEST_CODE_NOT_SUPPORTED) {
+ try {
+ log.warn("Examine topic stats failure , the server version is less than 5.1.0, and downward compatibility begins, {}", e.getErrorMessage());
+ return overrideExamineTopicStats(adminClient, topic);
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ } else {
+ throw new RuntimeException(e);
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ /**
+ * examineConsumeStats
+ * Compatible with 4.9.4 and earlier
+ *
+ * @param adminClient
+ * @param topic
+ * @return
+ */
+ private static ConsumeStats examineConsumeStats(DefaultMQAdminExt adminClient, String groupName, String topic) {
+ try {
+ return adminClient.examineConsumeStats(groupName, topic);
+ } catch (MQBrokerException e) {
+ // Compatible with 4.9.4 and earlier
+ if (e.getResponseCode() == ResponseCode.REQUEST_CODE_NOT_SUPPORTED) {
+ try {
+ log.warn("Examine consume stats failure, the server version is less than 5.1.0, and downward compatibility begins {}", e.getErrorMessage());
+ return overrideExamineConsumeStats(adminClient, groupName, topic);
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ } else {
+ throw new RuntimeException(e);
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ /**
+ * Compatible with version 4.9.4
+ *
+ * @param adminClient
+ * @param topic
+ * @return
+ * @throws RemotingException
+ * @throws InterruptedException
+ * @throws MQClientException
+ * @throws MQBrokerException
+ */
+ private static TopicStatsTable overrideExamineTopicStats(DefaultMQAdminExt adminClient,
+ String topic) throws RemotingException, InterruptedException, MQClientException, MQBrokerException {
+ TopicRouteData topicRouteData = adminClient.examineTopicRouteInfo(topic);
+ TopicStatsTable topicStatsTable = new TopicStatsTable();
+ for (BrokerData bd : topicRouteData.getBrokerDatas()) {
+ String addr = bd.selectBrokerAddr();
+ if (addr != null) {
+ TopicStatsTable tst = adminClient
+ .getDefaultMQAdminExtImpl()
+ .getMqClientInstance()
+ .getMQClientAPIImpl()
+ .getTopicStatsInfo(addr, topic, 5000);
+ topicStatsTable.getOffsetTable().putAll(tst.getOffsetTable());
+ }
+ }
+ return topicStatsTable;
+ }
+
+ /**
+ * Compatible with version 4.9.4
+ *
+ * @param adminExt
+ * @param groupName
+ * @param topic
+ * @return
+ * @throws MQClientException
+ * @throws RemotingException
+ * @throws InterruptedException
+ * @throws MQBrokerException
+ */
+ private static ConsumeStats overrideExamineConsumeStats(DefaultMQAdminExt adminExt, String groupName,
+ String topic) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
+ TopicRouteData topicRouteData = null;
+ List routeTopics = new ArrayList<>();
+ routeTopics.add(MixAll.getRetryTopic(groupName));
+ if (topic != null) {
+ routeTopics.add(topic);
+ routeTopics.add(KeyBuilder.buildPopRetryTopic(topic, groupName));
+ }
+ for (int i = 0; i < routeTopics.size(); i++) {
+ try {
+ topicRouteData = adminExt.getDefaultMQAdminExtImpl().examineTopicRouteInfo(routeTopics.get(i));
+ if (topicRouteData != null) {
+ break;
+ }
+ } catch (Throwable e) {
+ if (i == routeTopics.size() - 1) {
+ throw e;
+ }
+ }
+ }
+ ConsumeStats result = new ConsumeStats();
+
+ for (BrokerData bd : topicRouteData.getBrokerDatas()) {
+ String addr = bd.selectBrokerAddr();
+ if (addr != null) {
+ ConsumeStats consumeStats = adminExt.getDefaultMQAdminExtImpl().getMqClientInstance().getMQClientAPIImpl().getConsumeStats(addr, groupName, topic, 5000 * 3);
+ result.getOffsetTable().putAll(consumeStats.getOffsetTable());
+ double value = result.getConsumeTps() + consumeStats.getConsumeTps();
+ result.setConsumeTps(value);
+ }
+ }
+
+ Set topics = Sets.newHashSet();
+ for (MessageQueue messageQueue : result.getOffsetTable().keySet()) {
+ topics.add(messageQueue.getTopic());
+ }
+
+ ConsumeStats staticResult = new ConsumeStats();
+ staticResult.setConsumeTps(result.getConsumeTps());
+
+ for (String currentTopic : topics) {
+ TopicRouteData currentRoute = adminExt.getDefaultMQAdminExtImpl().examineTopicRouteInfo(currentTopic);
+ if (currentRoute.getTopicQueueMappingByBroker() == null
+ || currentRoute.getTopicQueueMappingByBroker().isEmpty()) {
+ //normal topic
+ for (Map.Entry entry : result.getOffsetTable().entrySet()) {
+ if (entry.getKey().getTopic().equals(currentTopic)) {
+ staticResult.getOffsetTable().put(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+ }
+
+ if (staticResult.getOffsetTable().isEmpty()) {
+ throw new MQClientException(ResponseCode.CONSUMER_NOT_ONLINE, "Not found the consumer group consume stats, because return offset table is empty, maybe the consumer not consume any message");
+ }
+ return staticResult;
+ }
+
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/common/cache/Cache.java b/rocketmq-connect-common/src/main/java/org/apache/rocketmq/connect/common/cache/Cache.java
similarity index 95%
rename from rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/common/cache/Cache.java
rename to rocketmq-connect-common/src/main/java/org/apache/rocketmq/connect/common/cache/Cache.java
index ebf90522..aa603081 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/common/cache/Cache.java
+++ b/rocketmq-connect-common/src/main/java/org/apache/rocketmq/connect/common/cache/Cache.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.rocketmq.connect.runtime.common.cache;
+package org.apache.rocketmq.connect.common.cache;
/**
* cache
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/common/cache/LRUCache.java b/rocketmq-connect-common/src/main/java/org/apache/rocketmq/connect/common/cache/LRUCache.java
similarity index 97%
rename from rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/common/cache/LRUCache.java
rename to rocketmq-connect-common/src/main/java/org/apache/rocketmq/connect/common/cache/LRUCache.java
index 5a32104b..73772519 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/common/cache/LRUCache.java
+++ b/rocketmq-connect-common/src/main/java/org/apache/rocketmq/connect/common/cache/LRUCache.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.rocketmq.connect.runtime.common.cache;
+package org.apache.rocketmq.connect.common.cache;
import java.util.Collections;
import java.util.LinkedHashMap;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/common/LoggerName.java b/rocketmq-connect-common/src/main/java/org/apache/rocketmq/connect/common/constant/LoggerName.java
similarity index 95%
rename from rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/common/LoggerName.java
rename to rocketmq-connect-common/src/main/java/org/apache/rocketmq/connect/common/constant/LoggerName.java
index 2b62956a..988aae47 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/common/LoggerName.java
+++ b/rocketmq-connect-common/src/main/java/org/apache/rocketmq/connect/common/constant/LoggerName.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.rocketmq.connect.runtime.common;
+package org.apache.rocketmq.connect.common.constant;
/**
* Define all the logger name of the runtime.
diff --git a/rocketmq-connect-runtime/pom.xml b/rocketmq-connect-runtime/pom.xml
index 4ed44af4..0ed0dd69 100644
--- a/rocketmq-connect-runtime/pom.xml
+++ b/rocketmq-connect-runtime/pom.xml
@@ -267,5 +267,10 @@
metric-exporter
+
+ org.apache.rocketmq
+ rocketmq-connect-common
+
+
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/DistributedConnectStartup.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/DistributedConnectStartup.java
index 4a35f30c..412c9ddc 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/DistributedConnectStartup.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/DistributedConnectStartup.java
@@ -24,7 +24,7 @@
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.lang3.StringUtils;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.connect.common.constant.LoggerName;
import org.apache.rocketmq.connect.runtime.config.WorkerConfig;
import org.apache.rocketmq.connect.runtime.controller.distributed.DistributedConfig;
import org.apache.rocketmq.connect.runtime.controller.distributed.DistributedConnectController;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/StandaloneConnectStartup.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/StandaloneConnectStartup.java
index 4b8913e3..30c2e427 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/StandaloneConnectStartup.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/StandaloneConnectStartup.java
@@ -24,7 +24,7 @@
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.lang3.StringUtils;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.connect.common.constant.LoggerName;
import org.apache.rocketmq.connect.runtime.config.WorkerConfig;
import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
import org.apache.rocketmq.connect.runtime.controller.standalone.StandaloneConfig;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java
index 5592add7..bf3c2c08 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java
@@ -26,7 +26,7 @@
import io.openmessaging.internal.DefaultKeyValue;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.connect.common.constant.LoggerName;
import org.apache.rocketmq.connect.runtime.config.ConnectorConfig;
import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
import org.apache.rocketmq.connect.runtime.errors.ErrorReporter;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
index f6ff52cf..583b1798 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
@@ -50,8 +50,8 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.connect.common.constant.LoggerName;
import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.connect.runtime.config.ConnectorConfig;
import org.apache.rocketmq.connect.runtime.config.SinkConnectorConfig;
import org.apache.rocketmq.connect.runtime.config.WorkerConfig;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
index 59641fd2..cdbde44f 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
@@ -27,8 +27,8 @@
import io.openmessaging.connector.api.data.RecordOffset;
import io.openmessaging.connector.api.data.RecordPartition;
import io.openmessaging.connector.api.storage.OffsetStorageReader;
+import org.apache.rocketmq.connect.common.constant.LoggerName;
import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.connect.runtime.config.WorkerConfig;
import org.apache.rocketmq.connect.runtime.connectorwrapper.status.WrapperStatusListener;
import org.apache.rocketmq.connect.runtime.errors.RetryWithToleranceOperator;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
index ee135cbd..5c2777ca 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
@@ -52,12 +52,12 @@
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.connect.common.constant.LoggerName;
import org.apache.rocketmq.connect.metrics.stats.Avg;
import org.apache.rocketmq.connect.metrics.stats.CumulativeCount;
import org.apache.rocketmq.connect.metrics.stats.Max;
import org.apache.rocketmq.connect.metrics.stats.Rate;
import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.connect.runtime.config.ConnectorConfig;
import org.apache.rocketmq.connect.runtime.config.SinkConnectorConfig;
import org.apache.rocketmq.connect.runtime.config.WorkerConfig;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContext.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContext.java
index 726fd9f0..5b71494d 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContext.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContext.java
@@ -28,8 +28,8 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.connect.common.constant.LoggerName;
import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
index f379b7fb..7b4097d7 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
@@ -50,12 +50,12 @@
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.connect.common.constant.LoggerName;
import org.apache.rocketmq.connect.metrics.stats.Avg;
import org.apache.rocketmq.connect.metrics.stats.CumulativeCount;
import org.apache.rocketmq.connect.metrics.stats.Max;
import org.apache.rocketmq.connect.metrics.stats.Rate;
import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.connect.runtime.config.ConnectorConfig;
import org.apache.rocketmq.connect.runtime.config.SourceConnectorConfig;
import org.apache.rocketmq.connect.runtime.config.WorkerConfig;
@@ -428,9 +428,7 @@ private String maybeCreateAndGetTopic(ConnectRecord record) {
if (StringUtils.isBlank(topic)) {
throw new ConnectException("source connect lack of topic config");
}
- if (!ConnectUtil.isTopicExist(workerConfig, topic)) {
- ConnectUtil.createTopic(workerConfig, new TopicConfig(topic));
- }
+ ConnectUtil.maybeCreateTopic(workerConfig, new TopicConfig(topic));
return topic;
}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/AbstractConnectController.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/AbstractConnectController.java
index 8a879751..98e65b4f 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/AbstractConnectController.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/AbstractConnectController.java
@@ -23,8 +23,8 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import org.apache.rocketmq.connect.common.constant.LoggerName;
import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.connect.runtime.config.ConnectorConfig;
import org.apache.rocketmq.connect.runtime.config.WorkerConfig;
import org.apache.rocketmq.connect.runtime.connectorwrapper.Worker;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/distributed/DistributedConnectController.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/distributed/DistributedConnectController.java
index 3417f6fd..bd3aa293 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/distributed/DistributedConnectController.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/distributed/DistributedConnectController.java
@@ -17,7 +17,7 @@
package org.apache.rocketmq.connect.runtime.controller.distributed;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.connect.common.constant.LoggerName;
import org.apache.rocketmq.connect.runtime.controller.AbstractConnectController;
import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
import org.apache.rocketmq.connect.runtime.service.ClusterManagementService;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/record/json/JsonConverter.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/record/json/JsonConverter.java
index d9e82594..9ed801f7 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/record/json/JsonConverter.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/record/json/JsonConverter.java
@@ -33,9 +33,9 @@
import io.openmessaging.connector.api.data.logical.Timestamp;
import io.openmessaging.connector.api.errors.ConnectException;
import org.apache.commons.lang3.StringUtils;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
-import org.apache.rocketmq.connect.runtime.common.cache.Cache;
-import org.apache.rocketmq.connect.runtime.common.cache.LRUCache;
+import org.apache.rocketmq.connect.common.constant.LoggerName;
+import org.apache.rocketmq.connect.common.cache.Cache;
+import org.apache.rocketmq.connect.common.cache.LRUCache;
import org.apache.rocketmq.connect.runtime.serialization.JsonDeserializer;
import org.apache.rocketmq.connect.runtime.serialization.JsonSerializer;
import org.slf4j.Logger;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/DeadLetterQueueReporter.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/DeadLetterQueueReporter.java
index b0635e16..9f4fbd1c 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/DeadLetterQueueReporter.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/DeadLetterQueueReporter.java
@@ -117,12 +117,12 @@ public static DeadLetterQueueReporter build(ConnectorTaskId connectorTaskId,
if (dlqTopic.isEmpty()) {
return null;
}
- if (!ConnectUtil.isTopicExist(workerConfig, dlqTopic)) {
- TopicConfig topicConfig = new TopicConfig(dlqTopic);
- topicConfig.setReadQueueNums(deadLetterQueueConfig.dlqTopicReadQueueNums());
- topicConfig.setWriteQueueNums(deadLetterQueueConfig.dlqTopicWriteQueueNums());
- ConnectUtil.createTopic(workerConfig, topicConfig);
- }
+
+ TopicConfig topicConfig = new TopicConfig(dlqTopic);
+ topicConfig.setReadQueueNums(deadLetterQueueConfig.dlqTopicReadQueueNums());
+ topicConfig.setWriteQueueNums(deadLetterQueueConfig.dlqTopicWriteQueueNums());
+ ConnectUtil.maybeCreateTopic(workerConfig, topicConfig);
+
DefaultMQProducer dlqProducer = ConnectUtil.initDefaultMQProducer(workerConfig);
return new DeadLetterQueueReporter(dlqProducer, sinkConfig, connectorTaskId, errorMetricsGroup);
}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/metrics/ConnectMetrics.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/metrics/ConnectMetrics.java
index 4ad0e56a..ed3efda7 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/metrics/ConnectMetrics.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/metrics/ConnectMetrics.java
@@ -19,10 +19,10 @@
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Reporter;
import com.codahale.metrics.Slf4jReporter;
+import org.apache.rocketmq.connect.common.constant.LoggerName;
import org.apache.rocketmq.connect.metrics.IReporter;
import org.apache.rocketmq.connect.metrics.MetricsReporter;
import org.apache.rocketmq.connect.metrics.ScheduledMetricsReporter;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.connect.runtime.config.WorkerConfig;
import org.apache.rocketmq.connect.runtime.utils.Utils;
import org.slf4j.Logger;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/ConnectorPluginsResource.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/ConnectorPluginsResource.java
index 3a1113b5..cb43b37e 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/ConnectorPluginsResource.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/ConnectorPluginsResource.java
@@ -19,7 +19,7 @@
import io.javalin.http.Context;
import io.openmessaging.connector.api.component.task.sink.SinkConnector;
import io.openmessaging.connector.api.component.task.source.SourceConnector;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.connect.common.constant.LoggerName;
import org.apache.rocketmq.connect.runtime.controller.AbstractConnectController;
import org.apache.rocketmq.connect.runtime.controller.isolation.PluginType;
import org.apache.rocketmq.connect.runtime.controller.isolation.PluginWrapper;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java
index b4f6c378..d4b8db3d 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java
@@ -22,8 +22,8 @@
import io.javalin.Javalin;
import io.javalin.http.Context;
import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.connect.common.constant.LoggerName;
import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerConnector;
import org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerTask;
import org.apache.rocketmq.connect.runtime.controller.AbstractConnectController;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/ListDeserializer.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/ListDeserializer.java
index 9b1e6abe..2305f690 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/ListDeserializer.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/ListDeserializer.java
@@ -18,7 +18,7 @@
package org.apache.rocketmq.connect.runtime.serialization;
import com.alibaba.fastjson.JSONArray;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.connect.common.constant.LoggerName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/ListSerializer.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/ListSerializer.java
index 7aec653f..44692658 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/ListSerializer.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/ListSerializer.java
@@ -18,7 +18,7 @@
package org.apache.rocketmq.connect.runtime.serialization;
import com.alibaba.fastjson.JSON;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.connect.common.constant.LoggerName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueDeserializer.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueDeserializer.java
index 353fd877..c5e5d86f 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueDeserializer.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueDeserializer.java
@@ -18,8 +18,8 @@
package org.apache.rocketmq.connect.runtime.serialization.store;
import com.alibaba.fastjson.JSON;
+import org.apache.rocketmq.connect.common.constant.LoggerName;
import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.connect.runtime.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueSerializer.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueSerializer.java
index 7042f302..0482172c 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueSerializer.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/ConnectKeyValueSerializer.java
@@ -18,8 +18,8 @@
package org.apache.rocketmq.connect.runtime.serialization.store;
import com.alibaba.fastjson.JSON;
+import org.apache.rocketmq.connect.common.constant.LoggerName;
import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.connect.runtime.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordOffsetDeserializer.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordOffsetDeserializer.java
index ce09f518..373c0455 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordOffsetDeserializer.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordOffsetDeserializer.java
@@ -19,7 +19,7 @@
import com.alibaba.fastjson.JSON;
import io.openmessaging.connector.api.data.RecordOffset;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.connect.common.constant.LoggerName;
import org.apache.rocketmq.connect.runtime.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordOffsetSerializer.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordOffsetSerializer.java
index 8b6a60c1..994e7e22 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordOffsetSerializer.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordOffsetSerializer.java
@@ -19,7 +19,7 @@
import com.alibaba.fastjson.JSON;
import io.openmessaging.connector.api.data.RecordOffset;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.connect.common.constant.LoggerName;
import org.apache.rocketmq.connect.runtime.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordPartitionDeserializer.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordPartitionDeserializer.java
index 7837eea7..b3a76343 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordPartitionDeserializer.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordPartitionDeserializer.java
@@ -18,7 +18,7 @@
package org.apache.rocketmq.connect.runtime.serialization.store;
import com.alibaba.fastjson.JSON;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.connect.common.constant.LoggerName;
import org.apache.rocketmq.connect.runtime.serialization.Deserializer;
import org.apache.rocketmq.connect.runtime.store.ExtendRecordPartition;
import org.slf4j.Logger;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordPartitionSerializer.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordPartitionSerializer.java
index e80c84ca..a28741d6 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordPartitionSerializer.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordPartitionSerializer.java
@@ -18,7 +18,7 @@
package org.apache.rocketmq.connect.runtime.serialization.store;
import com.alibaba.fastjson.JSON;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.connect.common.constant.LoggerName;
import org.apache.rocketmq.connect.runtime.serialization.Serializer;
import org.apache.rocketmq.connect.runtime.store.ExtendRecordPartition;
import org.slf4j.Logger;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordPositionMapDeserializer.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordPositionMapDeserializer.java
index e2f88364..80c59a28 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordPositionMapDeserializer.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordPositionMapDeserializer.java
@@ -19,7 +19,7 @@
import com.alibaba.fastjson.JSON;
import io.openmessaging.connector.api.data.RecordOffset;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.connect.common.constant.LoggerName;
import org.apache.rocketmq.connect.runtime.serialization.Deserializer;
import org.apache.rocketmq.connect.runtime.store.ExtendRecordPartition;
import org.slf4j.Logger;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordPositionMapSerializer.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordPositionMapSerializer.java
index 030bf19d..e2e9624c 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordPositionMapSerializer.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/serialization/store/RecordPositionMapSerializer.java
@@ -19,7 +19,7 @@
import com.alibaba.fastjson.JSON;
import io.openmessaging.connector.api.data.RecordOffset;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.connect.common.constant.LoggerName;
import org.apache.rocketmq.connect.runtime.serialization.Serializer;
import org.apache.rocketmq.connect.runtime.store.ExtendRecordPartition;
import org.slf4j.Logger;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractConfigManagementService.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractConfigManagementService.java
index 8dbf4838..3e950507 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractConfigManagementService.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractConfigManagementService.java
@@ -19,8 +19,6 @@
import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.component.connector.Connector;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.rocketmq.connect.runtime.common.ConfigException;
import io.openmessaging.connector.api.component.task.sink.SinkConnector;
import io.openmessaging.connector.api.component.task.source.SourceConnector;
import io.openmessaging.connector.api.data.RecordConverter;
@@ -29,9 +27,19 @@
import io.openmessaging.connector.api.data.SchemaBuilder;
import io.openmessaging.connector.api.data.Struct;
import io.openmessaging.connector.api.errors.ConnectException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.connect.common.constant.LoggerName;
+import org.apache.rocketmq.connect.runtime.common.ConfigException;
import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.connect.runtime.config.ConnectorConfig;
import org.apache.rocketmq.connect.runtime.config.SinkConnectorConfig;
import org.apache.rocketmq.connect.runtime.config.SourceConnectorConfig;
@@ -49,15 +57,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
import static org.apache.rocketmq.connect.runtime.config.ConnectorConfig.CONNECTOR_CLASS;
/**
@@ -72,53 +71,52 @@ public abstract class AbstractConfigManagementService implements ConfigManagemen
public static final String DELETE_CONNECTOR_PREFIX = "delete-";
protected static final String FIELD_STATE = "state";
protected static final String FIELD_EPOCH = "epoch";
- protected static final String FIELD_PROPS = "properties";
- protected static final String FIELD_DELETED = "deleted";
/**
* delete connector V0
*/
@Deprecated
public static final Schema CONNECTOR_DELETE_CONFIGURATION_V0 = SchemaBuilder.struct()
- .field(FIELD_EPOCH, SchemaBuilder.int64().build())
- .build();
-
- /**
- * delete connector V1
- */
- public static final Schema CONNECTOR_DELETE_CONFIGURATION_V1 = SchemaBuilder.struct()
- .field(FIELD_EPOCH, SchemaBuilder.int64().build())
- .field(FIELD_DELETED, SchemaBuilder.bool().build())
- .build();
+ .field(FIELD_EPOCH, SchemaBuilder.int64().build())
+ .build();
/**
* connector state
*/
public static final Schema TARGET_STATE_V0 = SchemaBuilder.struct()
- .field(FIELD_STATE, SchemaBuilder.string().build())
- .field(FIELD_EPOCH, SchemaBuilder.int64().build())
- .build();
+ .field(FIELD_STATE, SchemaBuilder.string().build())
+ .field(FIELD_EPOCH, SchemaBuilder.int64().build())
+ .build();
+ protected static final String FIELD_PROPS = "properties";
/**
* connector configuration
*/
public static final Schema CONNECTOR_CONFIGURATION_V0 = SchemaBuilder.struct()
- .field(FIELD_STATE, SchemaBuilder.string().build())
- .field(FIELD_EPOCH, SchemaBuilder.int64().build())
- .field(FIELD_PROPS,
- SchemaBuilder.map(
- SchemaBuilder.string().optional().build(),
- SchemaBuilder.string().optional().build()
- ).build())
- .build();
+ .field(FIELD_STATE, SchemaBuilder.string().build())
+ .field(FIELD_EPOCH, SchemaBuilder.int64().build())
+ .field(FIELD_PROPS,
+ SchemaBuilder.map(
+ SchemaBuilder.string().optional().build(),
+ SchemaBuilder.string().optional().build()
+ ).build())
+ .build();
/**
* task configuration
*/
public static final Schema TASK_CONFIGURATION_V0 = SchemaBuilder.struct()
- .field(FIELD_EPOCH, SchemaBuilder.int64().build())
- .field(FIELD_PROPS,
- SchemaBuilder.map(
- SchemaBuilder.string().build(),
- SchemaBuilder.string().optional().build()
- ).build())
- .build();
+ .field(FIELD_EPOCH, SchemaBuilder.int64().build())
+ .field(FIELD_PROPS,
+ SchemaBuilder.map(
+ SchemaBuilder.string().build(),
+ SchemaBuilder.string().optional().build()
+ ).build())
+ .build();
+ protected static final String FIELD_DELETED = "deleted";
+ /**
+ * delete connector V1
+ */
+ public static final Schema CONNECTOR_DELETE_CONFIGURATION_V1 = SchemaBuilder.struct()
+ .field(FIELD_EPOCH, SchemaBuilder.int64().build())
+ .field(FIELD_DELETED, SchemaBuilder.bool().build())
+ .build();
private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
protected final String configManagePrefix = "ConfigManage";
/**
@@ -172,7 +170,6 @@ public void initialize(WorkerConfig workerConfig, RecordConverter converter, Plu
this.dataSynchronizer = initializationDataSynchronizer(workerConfig);
}
-
@Override
public boolean enabledCompactTopic() {
return false;
@@ -306,7 +303,6 @@ protected void putTaskConfigs(String connectorName, List config
taskKeyValueStore.put(connectorName, configs);
}
-
@Override
public void recomputeTaskConfigs(String connectorName, ConnectKeyValue configs) {
int maxTask = configs.getInt(ConnectorConfig.MAX_TASK, ConnectorConfig.TASKS_MAX_DEFAULT);
@@ -394,7 +390,6 @@ public ClusterConfigState snapshot() {
return new ClusterConfigState(connectorTaskCounts, connectorConfigs, connectorTargetStates, connectorTaskConfigs);
}
-
@Override
public Plugin getPlugin() {
return this.plugin;
@@ -418,22 +413,22 @@ public void triggerListener() {
}
}
-
// ======= Start receives the config message and transforms the storage ======
protected void process(String key, SchemaAndValue schemaAndValue) {
if (key.startsWith(TARGET_STATE_PREFIX)) {
// target state listener
String connectorName = key.substring(TARGET_STATE_PREFIX.length());
+ processTargetStateRecord(connectorName, schemaAndValue);
+
+ } else if (key.startsWith(CONNECTOR_PREFIX)) {
+ // connector config update
+ String connectorName = key.substring(CONNECTOR_PREFIX.length());
if (schemaAndValue.schema().equals(CONNECTOR_DELETE_CONFIGURATION_V1)) {
processDeleteConnectorRecord(connectorName, schemaAndValue);
} else {
- processTargetStateRecord(connectorName, schemaAndValue);
+ processConnectorConfigRecord(connectorName, schemaAndValue);
}
- } else if (key.startsWith(CONNECTOR_PREFIX)) {
- // connector config update
- String connectorName = key.substring(CONNECTOR_PREFIX.length());
- processConnectorConfigRecord(connectorName, schemaAndValue);
} else if (key.startsWith(TASK_PREFIX)) {
// task config update
ConnectorTaskId taskId = parseTaskId(key);
@@ -468,8 +463,12 @@ private void processDeleteConnectorRecord(String connectorName, SchemaAndValue s
// config update
if ((Long) epoch > oldConfig.getEpoch()) {
// remove
- connectorKeyValueStore.remove(connectorName);
- taskKeyValueStore.remove(connectorName);
+ if (connectorKeyValueStore.containsKey(connectorName)) {
+ connectorKeyValueStore.remove(connectorName);
+ }
+ if (taskKeyValueStore.containsKey(connectorName)) {
+ taskKeyValueStore.remove(connectorName);
+ }
// reblance
triggerListener();
}
@@ -515,14 +514,14 @@ private void processTargetStateRecord(String connectorName, SchemaAndValue schem
if (!(targetState instanceof String)) {
// target state
log.error("Invalid data for target state for connector '{}': 'state' field should be a String but is {}",
- connectorName, className(targetState));
+ connectorName, className(targetState));
return;
}
Object epoch = struct.get(FIELD_EPOCH);
if (!(epoch instanceof Long)) {
// epoch
log.error("Invalid data for epoch for connector '{}': 'epoch' field should be a Long but is {}",
- connectorName, className(epoch));
+ connectorName, className(epoch));
return;
}
@@ -550,21 +549,21 @@ private boolean mergeConnectConfig(String connectName, SchemaAndValue schemaAndV
if (!(targetState instanceof String)) {
// target state
log.error("Invalid data for target state for connector '{}': 'state' field should be a String but is {}",
- connectName, className(targetState));
+ connectName, className(targetState));
return false;
}
Object epoch = value.get(FIELD_EPOCH);
if (!(epoch instanceof Long)) {
// epoch
log.error("Invalid data for epoch for connector '{}': 'state' field should be a long but is {}",
- connectName, className(epoch));
+ connectName, className(epoch));
return false;
}
Object props = value.get(FIELD_PROPS);
if (!(props instanceof Map)) {
// properties
log.error("Invalid data for properties for connector '{}': 'state' field should be a Map but is {}",
- connectName, className(props));
+ connectName, className(props));
return false;
}
// new configs
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractPositionManagementService.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractPositionManagementService.java
index 2929d4ad..a469b182 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractPositionManagementService.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractPositionManagementService.java
@@ -21,7 +21,7 @@
import io.openmessaging.connector.api.data.RecordConverter;
import io.openmessaging.connector.api.data.RecordOffset;
import io.openmessaging.connector.api.data.SchemaAndValue;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.connect.common.constant.LoggerName;
import org.apache.rocketmq.connect.runtime.config.WorkerConfig;
import org.apache.rocketmq.connect.runtime.store.ExtendRecordPartition;
import org.apache.rocketmq.connect.runtime.store.KeyValueStore;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractStateManagementService.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractStateManagementService.java
index edad9f18..c71d439c 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractStateManagementService.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractStateManagementService.java
@@ -29,8 +29,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.rocketmq.connect.common.constant.LoggerName;
import org.apache.rocketmq.connect.runtime.common.ConnAndTaskStatus;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.connect.runtime.config.WorkerConfig;
import org.apache.rocketmq.connect.runtime.connectorwrapper.status.AbstractStatus;
import org.apache.rocketmq.connect.runtime.connectorwrapper.status.ConnectorStatus;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementServiceImpl.java
index 6aeca0ee..480be35e 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementServiceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementServiceImpl.java
@@ -22,7 +22,7 @@
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.connect.common.constant.LoggerName;
import org.apache.rocketmq.connect.runtime.config.WorkerConfig;
import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
import org.apache.rocketmq.remoting.common.RemotingHelper;
@@ -69,11 +69,9 @@ private void prepare(WorkerConfig connectConfig) {
ConnectUtil.createSubGroup(connectConfig, consumerGroup);
}
String clusterStoreTopic = connectConfig.getClusterStoreTopic();
- if (!ConnectUtil.isTopicExist(connectConfig, clusterStoreTopic)) {
- log.info("try to create cluster store topic: {}!", clusterStoreTopic);
- TopicConfig topicConfig = new TopicConfig(clusterStoreTopic, 1, 1, 6);
- ConnectUtil.createTopic(connectConfig, topicConfig);
- }
+ TopicConfig topicConfig = new TopicConfig(clusterStoreTopic, 1, 1, 6);
+ log.info("try to create cluster store topic: {}!", clusterStoreTopic);
+ ConnectUtil.maybeCreateTopic(connectConfig, topicConfig);
}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/DefaultConnectorContext.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/DefaultConnectorContext.java
index 34e3521b..a739e54f 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/DefaultConnectorContext.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/DefaultConnectorContext.java
@@ -18,8 +18,8 @@
package org.apache.rocketmq.connect.runtime.service;
import io.openmessaging.connector.api.component.connector.ConnectorContext;
+import org.apache.rocketmq.connect.common.constant.LoggerName;
import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerConnector;
import org.apache.rocketmq.connect.runtime.controller.AbstractConnectController;
import org.slf4j.Logger;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java
index 699c7191..048105ae 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java
@@ -18,9 +18,9 @@
package org.apache.rocketmq.connect.runtime.service;
import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.connect.common.constant.LoggerName;
import org.apache.rocketmq.connect.runtime.common.ConnAndTaskConfigs;
import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.connect.runtime.config.WorkerConfig;
import org.apache.rocketmq.connect.runtime.connectorwrapper.Worker;
import org.apache.rocketmq.connect.runtime.controller.AbstractConnectController;
@@ -79,7 +79,7 @@ public void checkClusterStoreTopic() {
String clusterStoreTopic = this.connectController.getConnectConfig().getClusterStoreTopic();
log.info("cluster store topic not exist, try to create it!");
TopicConfig topicConfig = new TopicConfig(clusterStoreTopic, 1, 1, 6);
- ConnectUtil.createTopic(connectConfig, topicConfig);
+ ConnectUtil.maybeCreateTopic(connectConfig, topicConfig);
}
}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceService.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceService.java
index 8620aa99..cc437322 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceService.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceService.java
@@ -17,7 +17,7 @@
package org.apache.rocketmq.connect.runtime.service;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.connect.common.constant.LoggerName;
import org.apache.rocketmq.connect.runtime.utils.ServiceThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/local/LocalConfigManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/local/LocalConfigManagementServiceImpl.java
index 2b0d2dc3..35fed262 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/local/LocalConfigManagementServiceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/local/LocalConfigManagementServiceImpl.java
@@ -79,7 +79,7 @@ public void initialize(WorkerConfig workerConfig, RecordConverter converter, Plu
public DataSynchronizer initializationDataSynchronizer(WorkerConfig workerConfig) {
return new BrokerBasedLog<>(workerConfig,
this.topic,
- ConnectUtil.createGroupName(configManagePrefix, workerConfig.getWorkerId()),
+ ConnectUtil.generateGroupName(configManagePrefix, workerConfig.getWorkerId()),
new ConfigChangeCallback(),
Serdes.serdeFrom(String.class),
Serdes.serdeFrom(byte[].class),
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/local/LocalPositionManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/local/LocalPositionManagementServiceImpl.java
index 157a278c..67663623 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/local/LocalPositionManagementServiceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/local/LocalPositionManagementServiceImpl.java
@@ -60,7 +60,7 @@ public DataSynchronizer initializationDataSynchronizer(WorkerConfig workerConfig
return new BrokerBasedLog(
workerConfig,
super.topic,
- ConnectUtil.createGroupName(super.positionManagePrefix, workerConfig.getWorkerId()),
+ ConnectUtil.generateGroupName(super.positionManagePrefix, workerConfig.getWorkerId()),
new PositionChangeCallback(),
Serdes.serdeFrom(ByteBuffer.class),
Serdes.serdeFrom(ByteBuffer.class),
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/local/LocalStateManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/local/LocalStateManagementServiceImpl.java
index 5598d3be..292b2ef8 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/local/LocalStateManagementServiceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/local/LocalStateManagementServiceImpl.java
@@ -77,7 +77,7 @@ public void initialize(WorkerConfig config, RecordConverter converter) {
public DataSynchronizer initializationDataSynchronizer(WorkerConfig config) {
return new BrokerBasedLog(config,
statusTopic,
- ConnectUtil.createGroupName(statusManagePrefix, config.getWorkerId()),
+ ConnectUtil.generateGroupName(statusManagePrefix, config.getWorkerId()),
new StatusChangeCallback(),
Serdes.serdeFrom(String.class),
Serdes.serdeFrom(byte[].class),
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/FilePositionManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/FilePositionManagementServiceImpl.java
index 6d8c682e..88024053 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/FilePositionManagementServiceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/FilePositionManagementServiceImpl.java
@@ -20,7 +20,7 @@
import io.openmessaging.connector.api.data.RecordOffset;
import io.openmessaging.connector.api.errors.ConnectException;
import org.apache.rocketmq.common.utils.ThreadUtils;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.connect.common.constant.LoggerName;
import org.apache.rocketmq.connect.runtime.config.WorkerConfig;
import org.apache.rocketmq.connect.runtime.serialization.store.RecordOffsetSerde;
import org.apache.rocketmq.connect.runtime.serialization.store.RecordPartitionSerde;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java
index 4d0d1b76..ca052d6a 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java
@@ -22,8 +22,8 @@
import io.openmessaging.connector.api.component.task.source.SourceConnector;
import io.openmessaging.connector.api.data.RecordConverter;
import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.rocketmq.connect.common.constant.LoggerName;
import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.connect.runtime.config.ConnectorConfig;
import org.apache.rocketmq.connect.runtime.config.SinkConnectorConfig;
import org.apache.rocketmq.connect.runtime.config.SourceConnectorConfig;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryStateManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryStateManagementServiceImpl.java
index 85fa0803..2f7b738a 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryStateManagementServiceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryStateManagementServiceImpl.java
@@ -17,7 +17,7 @@
package org.apache.rocketmq.connect.runtime.service.memory;
import io.openmessaging.connector.api.data.RecordConverter;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.connect.common.constant.LoggerName;
import org.apache.rocketmq.connect.runtime.config.WorkerConfig;
import org.apache.rocketmq.connect.runtime.connectorwrapper.status.ConnectorStatus;
import org.apache.rocketmq.connect.runtime.connectorwrapper.status.TaskStatus;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/StandaloneRebalanceService.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/StandaloneRebalanceService.java
index dbe39e23..0025e5b1 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/StandaloneRebalanceService.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/StandaloneRebalanceService.java
@@ -17,7 +17,7 @@
package org.apache.rocketmq.connect.runtime.service.memory;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.connect.common.constant.LoggerName;
import org.apache.rocketmq.connect.runtime.service.ClusterManagementService;
import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
import org.apache.rocketmq.connect.runtime.service.RebalanceImpl;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/rocketmq/RocketMqConfigManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/rocketmq/RocketMqConfigManagementServiceImpl.java
index 899d7cd1..5ac864c3 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/rocketmq/RocketMqConfigManagementServiceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/rocketmq/RocketMqConfigManagementServiceImpl.java
@@ -53,7 +53,7 @@ public boolean enabledCompactTopic() {
public DataSynchronizer initializationDataSynchronizer(WorkerConfig workerConfig) {
return new BrokerBasedLog<>(workerConfig,
this.topic,
- ConnectUtil.createGroupName(configManagePrefix, workerConfig.getWorkerId()),
+ ConnectUtil.generateGroupName(configManagePrefix, workerConfig.getWorkerId()),
new ConfigChangeCallback(),
Serdes.serdeFrom(String.class),
Serdes.serdeFrom(byte[].class),
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/rocketmq/RocketMqPositionManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/rocketmq/RocketMqPositionManagementServiceImpl.java
index d4ba1381..1ba9c27a 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/rocketmq/RocketMqPositionManagementServiceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/rocketmq/RocketMqPositionManagementServiceImpl.java
@@ -47,7 +47,7 @@ public DataSynchronizer initializationDataSynchronizer(WorkerConfig workerConfig
return new BrokerBasedLog(
workerConfig,
this.topic,
- ConnectUtil.createGroupName(positionManagePrefix, workerConfig.getWorkerId()),
+ ConnectUtil.generateGroupName(positionManagePrefix, workerConfig.getWorkerId()),
new PositionChangeCallback(),
Serdes.serdeFrom(ByteBuffer.class),
Serdes.serdeFrom(ByteBuffer.class),
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/rocketmq/RocketMqStateManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/rocketmq/RocketMqStateManagementServiceImpl.java
index b8e68566..12e225c0 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/rocketmq/RocketMqStateManagementServiceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/rocketmq/RocketMqStateManagementServiceImpl.java
@@ -51,7 +51,7 @@ public void initialize(WorkerConfig config, RecordConverter converter) {
public DataSynchronizer initializationDataSynchronizer(WorkerConfig config) {
return new BrokerBasedLog(config,
statusTopic,
- ConnectUtil.createGroupName(statusManagePrefix, config.getWorkerId()),
+ ConnectUtil.generateGroupName(statusManagePrefix, config.getWorkerId()),
new StatusChangeCallback(),
Serdes.serdeFrom(String.class),
Serdes.serdeFrom(byte[].class),
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/strategy/AllocateConnAndTaskStrategyByConsistentHash.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/strategy/AllocateConnAndTaskStrategyByConsistentHash.java
index 11094dab..5c17abc6 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/strategy/AllocateConnAndTaskStrategyByConsistentHash.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/strategy/AllocateConnAndTaskStrategyByConsistentHash.java
@@ -20,9 +20,9 @@
import org.apache.rocketmq.common.consistenthash.ConsistentHashRouter;
import org.apache.rocketmq.common.consistenthash.HashFunction;
import org.apache.rocketmq.common.consistenthash.Node;
+import org.apache.rocketmq.connect.common.constant.LoggerName;
import org.apache.rocketmq.connect.runtime.common.ConnAndTaskConfigs;
import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.connect.runtime.config.ConnectorConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/strategy/DefaultAllocateConnAndTaskStrategy.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/strategy/DefaultAllocateConnAndTaskStrategy.java
index a12128d1..aab037e2 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/strategy/DefaultAllocateConnAndTaskStrategy.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/strategy/DefaultAllocateConnAndTaskStrategy.java
@@ -17,9 +17,9 @@
package org.apache.rocketmq.connect.runtime.service.strategy;
+import org.apache.rocketmq.connect.common.constant.LoggerName;
import org.apache.rocketmq.connect.runtime.common.ConnAndTaskConfigs;
import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/stats/ConnectStats.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/stats/ConnectStats.java
index 20559871..6d17558b 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/stats/ConnectStats.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/stats/ConnectStats.java
@@ -17,7 +17,7 @@
*/
package org.apache.rocketmq.connect.runtime.stats;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.connect.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/stats/ConnectStatsManager.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/stats/ConnectStatsManager.java
index e4450749..2ddb6afc 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/stats/ConnectStatsManager.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/stats/ConnectStatsManager.java
@@ -29,7 +29,7 @@
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
-import static org.apache.rocketmq.connect.runtime.common.LoggerName.ROCKETMQ_CONNECT_STATS;
+import static org.apache.rocketmq.connect.common.constant.LoggerName.ROCKETMQ_CONNECT_STATS;
public class ConnectStatsManager {
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/stats/ConnectStatsService.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/stats/ConnectStatsService.java
index a30259a8..619878b7 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/stats/ConnectStatsService.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/stats/ConnectStatsService.java
@@ -19,7 +19,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.ServiceThread;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.connect.common.constant.LoggerName;
import java.text.MessageFormat;
import java.util.HashMap;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/FileBaseKeyValueStore.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/FileBaseKeyValueStore.java
index 624f4e00..9b2249b2 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/FileBaseKeyValueStore.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/FileBaseKeyValueStore.java
@@ -21,7 +21,7 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.connect.common.constant.LoggerName;
import org.apache.rocketmq.connect.runtime.serialization.Serde;
import org.apache.rocketmq.connect.runtime.utils.Base64Util;
import org.apache.rocketmq.connect.runtime.utils.FileAndPropertyUtil;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/PositionStorageWriter.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/PositionStorageWriter.java
index e683a155..9281baa7 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/PositionStorageWriter.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/PositionStorageWriter.java
@@ -23,7 +23,7 @@
import io.openmessaging.connector.api.data.RecordPartition;
import io.openmessaging.connector.api.errors.ConnectException;
import io.openmessaging.connector.api.storage.OffsetStorageWriter;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.connect.common.constant.LoggerName;
import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
import org.apache.rocketmq.connect.runtime.utils.datasync.DataSynchronizerCallback;
import org.slf4j.Logger;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java
index 158d6ada..b5300231 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java
@@ -17,64 +17,41 @@
package org.apache.rocketmq.connect.runtime.utils;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
import io.openmessaging.connector.api.data.RecordOffset;
import io.openmessaging.connector.api.data.RecordPartition;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Set;
import java.util.UUID;
-import java.util.stream.Collectors;
-import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
-import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
-import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.common.KeyBuilder;
-import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
-import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.utils.NetworkUtil;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.connect.common.ConsumerConfiguration;
+import org.apache.rocketmq.connect.common.ProducerConfiguration;
+import org.apache.rocketmq.connect.common.RocketMqBaseConfiguration;
+import org.apache.rocketmq.connect.common.RocketMqUtils;
import org.apache.rocketmq.connect.runtime.config.ConnectorConfig;
import org.apache.rocketmq.connect.runtime.config.WorkerConfig;
import org.apache.rocketmq.connect.runtime.service.strategy.AllocateConnAndTaskStrategy;
import org.apache.rocketmq.remoting.RPCHook;
-import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
-import org.apache.rocketmq.remoting.protocol.ResponseCode;
-import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
-import org.apache.rocketmq.remoting.protocol.admin.OffsetWrapper;
import org.apache.rocketmq.remoting.protocol.admin.TopicOffset;
-import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
-import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
-import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper;
-import org.apache.rocketmq.remoting.protocol.route.BrokerData;
-import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
-import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
-import org.apache.rocketmq.tools.command.CommandUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import static org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerSinkTask.QUEUE_OFFSET;
public class ConnectUtil {
- private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
+
public static final String SYS_TASK_CG_PREFIX = "connect-";
- public static String createGroupName(String prefix) {
+ public static String generateGroupName(String prefix) {
StringBuilder sb = new StringBuilder();
sb.append(prefix).append("-");
sb.append(NetworkUtil.getLocalAddress()).append("-");
@@ -83,24 +60,12 @@ public static String createGroupName(String prefix) {
return sb.toString().replace(".", "-");
}
- public static String createGroupName(String prefix, String postfix) {
+ public static String generateGroupName(String prefix, String postfix) {
return new StringBuilder().append(prefix).append("-").append(postfix).toString();
}
- public static String createInstance(String servers) {
- String[] serversArray = servers.split(";");
- List serversList = new ArrayList();
- for (String server : serversArray) {
- if (!serversList.contains(server)) {
- serversList.add(server);
- }
- }
- Collections.sort(serversList);
- return String.valueOf(serversList.toString().hashCode());
- }
-
- public static String createUniqInstance(String prefix) {
- return new StringBuffer(prefix).append("-").append(UUID.randomUUID().toString()).toString();
+ private static String createUniqInstance(String prefix) {
+ return prefix + "-" + UUID.randomUUID();
}
public static AllocateConnAndTaskStrategy initAllocateConnAndTaskStrategy(WorkerConfig connectConfig) {
@@ -111,19 +76,43 @@ public static AllocateConnAndTaskStrategy initAllocateConnAndTaskStrategy(Worker
}
}
+ private static RocketMqBaseConfiguration toBaseConfiguration(WorkerConfig connectConfig, String group) {
+ return RocketMqBaseConfiguration
+ .builder()
+ .namesrvAddr(connectConfig.getNamesrvAddr())
+ .aclEnable(connectConfig.isAclEnable())
+ .accessKey(connectConfig.getAccessKey())
+ .secretKey(connectConfig.getSecretKey())
+ .groupId(group)
+ .build();
+ }
+
+ private static ConsumerConfiguration toConsumerConfiguration(WorkerConfig connectConfig) {
+ return ConsumerConfiguration
+ .consumerBuilder()
+ .namesrvAddr(connectConfig.getNamesrvAddr())
+ .aclEnable(connectConfig.isAclEnable())
+ .accessKey(connectConfig.getAccessKey())
+ .secretKey(connectConfig.getSecretKey())
+ .build();
+ }
+
+ private static ProducerConfiguration toProducerConfiguration(WorkerConfig connectConfig, String group) {
+ return ProducerConfiguration
+ .producerBuilder()
+ .namesrvAddr(connectConfig.getNamesrvAddr())
+ .aclEnable(connectConfig.isAclEnable())
+ .accessKey(connectConfig.getAccessKey())
+ .secretKey(connectConfig.getSecretKey())
+ .groupId(group)
+ .maxMessageSize(ConnectorConfig.MAX_MESSAGE_SIZE)
+ .sendMsgTimeout(connectConfig.getOperationTimeout())
+ .build();
+ }
+
public static DefaultMQProducer initDefaultMQProducer(WorkerConfig connectConfig) {
- RPCHook rpcHook = null;
- if (connectConfig.getAclEnable()) {
- rpcHook = new AclClientRPCHook(new SessionCredentials(connectConfig.getAccessKey(), connectConfig.getSecretKey()));
- }
- DefaultMQProducer producer = new DefaultMQProducer(rpcHook);
- producer.setNamesrvAddr(connectConfig.getNamesrvAddr());
- producer.setInstanceName(createUniqInstance(connectConfig.getNamesrvAddr()));
- producer.setProducerGroup(connectConfig.getRmqProducerGroup());
- producer.setSendMsgTimeout(connectConfig.getOperationTimeout());
- producer.setMaxMessageSize(ConnectorConfig.MAX_MESSAGE_SIZE);
- producer.setLanguage(LanguageCode.JAVA);
- return producer;
+ ProducerConfiguration producerConfiguration = toProducerConfiguration(connectConfig, connectConfig.getRmqProducerGroup());
+ return RocketMqUtils.initDefaultMQProducer(producerConfiguration);
}
public static DefaultMQPullConsumer initDefaultMQPullConsumer(WorkerConfig connectConfig) {
@@ -142,130 +131,52 @@ public static DefaultMQPullConsumer initDefaultMQPullConsumer(WorkerConfig conne
return consumer;
}
- public static DefaultMQPushConsumer initDefaultMQPushConsumer(WorkerConfig connectConfig) {
- RPCHook rpcHook = null;
- if (connectConfig.getAclEnable()) {
- rpcHook = new AclClientRPCHook(new SessionCredentials(connectConfig.getAccessKey(), connectConfig.getSecretKey()));
- }
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(rpcHook);
- consumer.setNamesrvAddr(connectConfig.getNamesrvAddr());
- consumer.setInstanceName(createUniqInstance(connectConfig.getNamesrvAddr()));
- consumer.setConsumerGroup(createGroupName(connectConfig.getRmqConsumerGroup()));
- consumer.setMaxReconsumeTimes(connectConfig.getRmqMaxRedeliveryTimes());
- consumer.setConsumeTimeout(connectConfig.getRmqMessageConsumeTimeout());
- consumer.setConsumeThreadMin(connectConfig.getRmqMinConsumeThreadNums());
- consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
- consumer.setLanguage(LanguageCode.JAVA);
- return consumer;
+ public static void maybeCreateTopic(WorkerConfig connectConfig, TopicConfig topicConfig) {
+ RocketMqUtils.maybeCreateTopic(toBaseConfiguration(connectConfig, connectConfig.getAdminExtGroup()), topicConfig);
}
- public static DefaultMQAdminExt startMQAdminTool(WorkerConfig connectConfig) throws MQClientException {
- RPCHook rpcHook = null;
- if (connectConfig.getAclEnable()) {
- rpcHook = new AclClientRPCHook(new SessionCredentials(connectConfig.getAccessKey(), connectConfig.getSecretKey()));
- }
- DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
- defaultMQAdminExt.setNamesrvAddr(connectConfig.getNamesrvAddr());
- defaultMQAdminExt.setAdminExtGroup(connectConfig.getAdminExtGroup());
- defaultMQAdminExt.setInstanceName(ConnectUtil.createUniqInstance(connectConfig.getNamesrvAddr()));
- defaultMQAdminExt.start();
- return defaultMQAdminExt;
+ public static boolean isTopicExist(WorkerConfig connectConfig, String topic) {
+ return RocketMqUtils.isTopicExist(toBaseConfiguration(connectConfig, connectConfig.getAdminExtGroup()), topic);
}
- public static void createTopic(WorkerConfig connectConfig, TopicConfig topicConfig) {
- DefaultMQAdminExt defaultMQAdminExt = null;
- try {
- defaultMQAdminExt = startMQAdminTool(connectConfig);
- ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
- Map> clusterAddrTable = clusterInfo.getClusterAddrTable();
- Set clusterNameSet = clusterAddrTable.keySet();
- for (String clusterName : clusterNameSet) {
- Set masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
- for (String addr : masterSet) {
- defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
- }
- }
- } catch (Exception e) {
- throw new RuntimeException("Create topic [" + topicConfig.getTopicName() + "] failed", e);
- } finally {
- if (defaultMQAdminExt != null) {
- defaultMQAdminExt.shutdown();
- }
- }
+ public static Set fetchAllConsumerGroupList(WorkerConfig connectConfig) {
+ RocketMqBaseConfiguration baseConfiguration = toBaseConfiguration(connectConfig, connectConfig.getAdminExtGroup());
+ return RocketMqUtils.fetchAllConsumerGroup(baseConfiguration);
}
- public static boolean isTopicExist(WorkerConfig connectConfig, String topic) {
- DefaultMQAdminExt defaultMQAdminExt = null;
- boolean foundTopicRouteInfo = false;
- try {
- defaultMQAdminExt = startMQAdminTool(connectConfig);
- TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
- if (topicRouteData != null) {
- foundTopicRouteInfo = true;
- }
- } catch (Exception e) {
- if (e instanceof MQClientException) {
- if (((MQClientException) e).getResponseCode() == ResponseCode.TOPIC_NOT_EXIST) {
- foundTopicRouteInfo = false;
- } else {
- throw new RuntimeException("Get topic route info failed", e);
- }
- } else {
- throw new RuntimeException("Get topic route info failed", e);
- }
- } finally {
- if (defaultMQAdminExt != null) {
- defaultMQAdminExt.shutdown();
- }
- }
- return foundTopicRouteInfo;
+ public static String createSubGroup(WorkerConfig connectConfig, String subGroup) {
+ RocketMqBaseConfiguration baseConfiguration = toBaseConfiguration(connectConfig, connectConfig.getAdminExtGroup());
+ return RocketMqUtils.createGroup(baseConfiguration, subGroup);
}
- public static Set fetchAllConsumerGroupList(WorkerConfig connectConfig) {
- Set consumerGroupSet = Sets.newHashSet();
- DefaultMQAdminExt defaultMQAdminExt = null;
- try {
- defaultMQAdminExt = startMQAdminTool(connectConfig);
- ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
- for (BrokerData brokerData : clusterInfo.getBrokerAddrTable().values()) {
- SubscriptionGroupWrapper subscriptionGroupWrapper = defaultMQAdminExt.getAllSubscriptionGroup(brokerData.selectBrokerAddr(), 3000L);
- consumerGroupSet.addAll(subscriptionGroupWrapper.getSubscriptionGroupTable().keySet());
- }
- } catch (Exception e) {
- throw new RuntimeException("Fetch all topic failed", e);
- } finally {
- if (defaultMQAdminExt != null) {
- defaultMQAdminExt.shutdown();
- }
- }
- return consumerGroupSet;
+ /**
+ * init default lite pull consumer
+ *
+ * @param connectConfig
+ * @return
+ * @throws MQClientException
+ */
+ public static DefaultLitePullConsumer initDefaultLitePullConsumer(WorkerConfig connectConfig, boolean autoCommit) {
+ ConsumerConfiguration consumerConfiguration = toConsumerConfiguration(connectConfig);
+ return RocketMqUtils.initDefaultLitePullConsumer(consumerConfiguration, autoCommit);
}
- public static String createSubGroup(WorkerConfig connectConfig, String subGroup) {
- DefaultMQAdminExt defaultMQAdminExt = null;
- try {
- defaultMQAdminExt = startMQAdminTool(connectConfig);
- SubscriptionGroupConfig initConfig = new SubscriptionGroupConfig();
- initConfig.setGroupName(subGroup);
- ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
- Map> clusterAddrTable = clusterInfo.getClusterAddrTable();
- Set clusterNameSet = clusterAddrTable.keySet();
- for (String clusterName : clusterNameSet) {
- Set masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
- for (String addr : masterSet) {
- defaultMQAdminExt.createAndUpdateSubscriptionGroupConfig(addr, initConfig);
- }
- }
- } catch (Exception e) {
- throw new RuntimeException("create subGroup: " + subGroup + " failed", e);
- } finally {
- if (defaultMQAdminExt != null) {
- defaultMQAdminExt.shutdown();
- }
- }
- return subGroup;
+ /**
+ * Get topic offsets
+ */
+ public static Map> offsetTopics(WorkerConfig config, List topics) {
+ RocketMqBaseConfiguration baseConfiguration = toBaseConfiguration(config, config.getAdminExtGroup());
+ return RocketMqUtils.offsetTopics(baseConfiguration, topics);
}
+ /**
+ * Get consumer group offset
+ */
+ public static Map currentOffsets(WorkerConfig config, String groupName, List topics,
+ Set messageQueues) {
+ RocketMqBaseConfiguration baseConfiguration = toBaseConfiguration(config, config.getAdminExtGroup());
+ return RocketMqUtils.currentOffsets(baseConfiguration, groupName, topics, messageQueues);
+ }
public static RecordPartition convertToRecordPartition(MessageQueue messageQueue) {
Map map = new HashMap<>();
@@ -319,247 +230,4 @@ public static RecordPartition convertToRecordPartition(String topic, String brok
return recordPartition;
}
- /**
- * init default lite pull consumer
- *
- * @param connectConfig
- * @return
- * @throws MQClientException
- */
- public static DefaultLitePullConsumer initDefaultLitePullConsumer(WorkerConfig connectConfig, boolean autoCommit) {
- DefaultLitePullConsumer consumer = null;
- if (Objects.isNull(consumer)) {
- if (StringUtils.isBlank(connectConfig.getAccessKey()) && StringUtils.isBlank(connectConfig.getSecretKey())) {
- consumer = new DefaultLitePullConsumer();
- } else {
- consumer = new DefaultLitePullConsumer(getAclRPCHook(connectConfig.getAccessKey(), connectConfig.getSecretKey()));
- }
- }
- consumer.setNamesrvAddr(connectConfig.getNamesrvAddr());
- String uniqueName = Thread.currentThread().getName() + "-" + System.currentTimeMillis() % 1000;
- consumer.setInstanceName(uniqueName);
- consumer.setUnitName(uniqueName);
- consumer.setAutoCommit(autoCommit);
- return consumer;
- }
-
- private static RPCHook getAclRPCHook(String accessKey, String secretKey) {
- return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
- }
-
- /**
- * Get topic offsets
- */
- public static Map> offsetTopics(
- WorkerConfig config, List topics) {
- Map> offsets = Maps.newConcurrentMap();
- DefaultMQAdminExt adminClient = null;
- try {
- adminClient = startMQAdminTool(config);
- for (String topic : topics) {
- TopicStatsTable topicStatsTable = examineTopicStats(adminClient, topic);
- offsets.put(topic, topicStatsTable.getOffsetTable());
- }
- return offsets;
- } catch (Exception e) {
- throw new RuntimeException(e);
- } finally {
- if (adminClient != null) {
- adminClient.shutdown();
- }
- }
- }
-
-
- /** Get consumer group offset */
- public static Map currentOffsets(WorkerConfig config, String groupName, List topics, Set messageQueues) {
- // Get consumer group offset
- DefaultMQAdminExt adminClient = null;
- try {
- adminClient = startMQAdminTool(config);
- Map consumerOffsets = Maps.newConcurrentMap();
- for (String topic : topics) {
- ConsumeStats consumeStats = examineConsumeStats(adminClient, groupName, topic);
- consumerOffsets.putAll(consumeStats.getOffsetTable());
- }
- return consumerOffsets.keySet().stream()
- .filter(messageQueue -> messageQueues.contains(messageQueue))
- .collect(
- Collectors.toMap(
- messageQueue -> messageQueue,
- messageQueue ->
- consumerOffsets.get(messageQueue).getConsumerOffset()));
- } catch (MQClientException e) {
- if (e instanceof MQClientException) {
- if (e.getResponseCode() == ResponseCode.TOPIC_NOT_EXIST) {
- return Collections.emptyMap();
- } else {
- throw new RuntimeException(e);
- }
- } else {
- throw new RuntimeException(e);
- }
- } finally {
- if (adminClient != null) {
- adminClient.shutdown();
- }
- }
- }
-
- /**
- * Compatible with 4.9.4 and earlier
- *
- * @param adminClient
- * @param topic
- * @return
- */
- private static TopicStatsTable examineTopicStats(DefaultMQAdminExt adminClient, String topic) {
- try {
- return adminClient.examineTopicStats(topic);
- } catch (MQBrokerException e) {
- // Compatible with 4.9.4 and earlier
- if (e.getResponseCode() == ResponseCode.REQUEST_CODE_NOT_SUPPORTED) {
- try {
- log.warn("Examine topic stats failure , the server version is less than 5.1.0, and downward compatibility begins, {}", e.getErrorMessage());
- return overrideExamineTopicStats(adminClient, topic);
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- } else {
- throw new RuntimeException(e);
- }
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- }
-
- /**
- * examineConsumeStats
- * Compatible with 4.9.4 and earlier
- *
- * @param adminClient
- * @param topic
- * @return
- */
- private static ConsumeStats examineConsumeStats(DefaultMQAdminExt adminClient, String groupName, String topic) {
- try {
- return adminClient.examineConsumeStats(groupName, topic);
- } catch (MQBrokerException e) {
- // Compatible with 4.9.4 and earlier
- if (e.getResponseCode() == ResponseCode.REQUEST_CODE_NOT_SUPPORTED) {
- try {
- log.warn("Examine consume stats failure, the server version is less than 5.1.0, and downward compatibility begins {}", e.getErrorMessage());
- return overrideExamineConsumeStats(adminClient, groupName, topic);
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- } else {
- throw new RuntimeException(e);
- }
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- }
-
- /**
- * Compatible with version 4.9.4
- *
- * @param adminClient
- * @param topic
- * @return
- * @throws RemotingException
- * @throws InterruptedException
- * @throws MQClientException
- * @throws MQBrokerException
- */
- private static TopicStatsTable overrideExamineTopicStats(DefaultMQAdminExt adminClient,
- String topic) throws RemotingException, InterruptedException, MQClientException, MQBrokerException {
- TopicRouteData topicRouteData = adminClient.examineTopicRouteInfo(topic);
- TopicStatsTable topicStatsTable = new TopicStatsTable();
- for (BrokerData bd : topicRouteData.getBrokerDatas()) {
- String addr = bd.selectBrokerAddr();
- if (addr != null) {
- TopicStatsTable tst = adminClient
- .getDefaultMQAdminExtImpl()
- .getMqClientInstance()
- .getMQClientAPIImpl()
- .getTopicStatsInfo(addr, topic, 5000);
- topicStatsTable.getOffsetTable().putAll(tst.getOffsetTable());
- }
- }
- return topicStatsTable;
- }
-
- /**
- * Compatible with version 4.9.4
- *
- * @param adminExt
- * @param groupName
- * @param topic
- * @return
- * @throws MQClientException
- * @throws RemotingException
- * @throws InterruptedException
- * @throws MQBrokerException
- */
- private static ConsumeStats overrideExamineConsumeStats(DefaultMQAdminExt adminExt, String groupName,
- String topic) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
- TopicRouteData topicRouteData = null;
- List routeTopics = new ArrayList<>();
- routeTopics.add(MixAll.getRetryTopic(groupName));
- if (topic != null) {
- routeTopics.add(topic);
- routeTopics.add(KeyBuilder.buildPopRetryTopic(topic, groupName));
- }
- for (int i = 0; i < routeTopics.size(); i++) {
- try {
- topicRouteData = adminExt.getDefaultMQAdminExtImpl().examineTopicRouteInfo(routeTopics.get(i));
- if (topicRouteData != null) {
- break;
- }
- } catch (Throwable e) {
- if (i == routeTopics.size() - 1) {
- throw e;
- }
- }
- }
- ConsumeStats result = new ConsumeStats();
-
- for (BrokerData bd : topicRouteData.getBrokerDatas()) {
- String addr = bd.selectBrokerAddr();
- if (addr != null) {
- ConsumeStats consumeStats = adminExt.getDefaultMQAdminExtImpl().getMqClientInstance().getMQClientAPIImpl().getConsumeStats(addr, groupName, topic, 5000 * 3);
- result.getOffsetTable().putAll(consumeStats.getOffsetTable());
- double value = result.getConsumeTps() + consumeStats.getConsumeTps();
- result.setConsumeTps(value);
- }
- }
-
- Set topics = Sets.newHashSet();
- for (MessageQueue messageQueue : result.getOffsetTable().keySet()) {
- topics.add(messageQueue.getTopic());
- }
-
- ConsumeStats staticResult = new ConsumeStats();
- staticResult.setConsumeTps(result.getConsumeTps());
-
- for (String currentTopic : topics) {
- TopicRouteData currentRoute = adminExt.getDefaultMQAdminExtImpl().examineTopicRouteInfo(currentTopic);
- if (currentRoute.getTopicQueueMappingByBroker() == null
- || currentRoute.getTopicQueueMappingByBroker().isEmpty()) {
- //normal topic
- for (Map.Entry entry : result.getOffsetTable().entrySet()) {
- if (entry.getKey().getTopic().equals(currentTopic)) {
- staticResult.getOffsetTable().put(entry.getKey(), entry.getValue());
- }
- }
- }
- }
-
- if (staticResult.getOffsetTable().isEmpty()) {
- throw new MQClientException(ResponseCode.CONSUMER_NOT_ONLINE, "Not found the consumer group consume stats, because return offset table is empty, maybe the consumer not consume any message");
- }
- return staticResult;
- }
-
}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ServiceThread.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ServiceThread.java
index 3dacf92b..1c749f63 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ServiceThread.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ServiceThread.java
@@ -17,7 +17,7 @@
package org.apache.rocketmq.connect.runtime.utils;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.connect.common.constant.LoggerName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/datasync/BrokerBasedLog.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/datasync/BrokerBasedLog.java
index 01eda152..f97e8304 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/datasync/BrokerBasedLog.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/datasync/BrokerBasedLog.java
@@ -39,7 +39,7 @@
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.connect.common.constant.LoggerName;
import org.apache.rocketmq.connect.runtime.config.WorkerConfig;
import org.apache.rocketmq.connect.runtime.serialization.Serde;
import org.apache.rocketmq.connect.runtime.utils.Base64Util;
@@ -135,16 +135,13 @@ private void prepare() {
log.info("Try to create group: {}!", groupName);
ConnectUtil.createSubGroup(workerConfig, groupName);
}
- if (!ConnectUtil.isTopicExist(workerConfig, topicName)) {
- log.info("Try to create store topic: {}!", topicName);
- TopicConfig topicConfig = new TopicConfig(topicName, 1, 1, PermName.PERM_READ | PermName.PERM_WRITE);
- if (enabledCompactTopic) {
- Map attributes = Maps.newConcurrentMap();
- attributes.put("+cleanup.policy", "COMPACTION");
- topicConfig.setAttributes(attributes);
- }
- ConnectUtil.createTopic(workerConfig, topicConfig);
+ TopicConfig topicConfig = new TopicConfig(topicName, 1, 1, PermName.PERM_READ | PermName.PERM_WRITE);
+ if (enabledCompactTopic) {
+ Map attributes = Maps.newConcurrentMap();
+ attributes.put("+cleanup.policy", "COMPACTION");
+ topicConfig.setAttributes(attributes);
}
+ ConnectUtil.maybeCreateTopic(workerConfig, topicConfig);
}
private void initializationAndStartConsumer(WorkerConfig workerConfig, String groupName) {
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/common/cache/LRUCacheTest.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/common/cache/LRUCacheTest.java
index 00a45ca0..a2718943 100644
--- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/common/cache/LRUCacheTest.java
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/common/cache/LRUCacheTest.java
@@ -18,6 +18,7 @@
package org.apache.rocketmq.connect.runtime.common.cache;
import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.connect.common.cache.LRUCache;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtilTest.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtilTest.java
index 25bd9b6e..b1bd48be 100644
--- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtilTest.java
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtilTest.java
@@ -19,17 +19,12 @@
import io.openmessaging.connector.api.data.RecordOffset;
import io.openmessaging.connector.api.data.RecordPartition;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
-import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
import org.apache.rocketmq.connect.runtime.config.WorkerConfig;
import org.apache.rocketmq.connect.runtime.connectorwrapper.NameServerMocker;
import org.apache.rocketmq.connect.runtime.connectorwrapper.ServerResponseMocker;
@@ -69,29 +64,13 @@ public void after() {
@Test
public void createGroupNameTest() {
- final String groupName1 = ConnectUtil.createGroupName(TEST_GROUP);
+ final String groupName1 = ConnectUtil.generateGroupName(TEST_GROUP);
Assert.assertTrue(groupName1.startsWith(TEST_GROUP));
- final String groupName2 = ConnectUtil.createGroupName(TEST_GROUP, "group");
+ final String groupName2 = ConnectUtil.generateGroupName(TEST_GROUP, "group");
Assert.assertTrue(groupName2.endsWith("group"));
}
- @Test
- public void createInstanceTest() {
- String servers = "localhost:9876;localhost:9877";
- final String instance = ConnectUtil.createInstance(servers);
- List serverList = new ArrayList<>();
- serverList.add(NAME_SERVER_ADDR);
- serverList.add("localhost:9877");
- Assert.assertTrue(instance.equals(String.valueOf(serverList.toString().hashCode())));
- }
-
- @Test
- public void createUniqInstanceTest() {
- final String instance = ConnectUtil.createUniqInstance(TEST_GROUP);
- Assert.assertTrue(instance.startsWith(TEST_GROUP));
- }
-
@Test
public void initAllocateConnAndTaskStrategyTest(){
WorkerConfig connectConfig = new WorkerConfig();
@@ -110,35 +89,13 @@ public void initDefaultMQProducerTest() {
Assert.assertEquals(5000, producer.getPersistConsumerOffsetInterval());
}
- @Test
- public void initDefaultMQPushConsumerTest() {
- WorkerConfig connectConfig = new WorkerConfig();
- connectConfig.setNamesrvAddr(NAME_SERVER_ADDR);
- final DefaultMQPushConsumer consumer = ConnectUtil.initDefaultMQPushConsumer(connectConfig);
- Assert.assertEquals(NAME_SERVER_ADDR, consumer.getNamesrvAddr());
- Assert.assertEquals(30000, consumer.getPollNameServerInterval());
- Assert.assertEquals(30000, consumer.getHeartbeatBrokerInterval());
- Assert.assertEquals(5000, consumer.getPersistConsumerOffsetInterval());
- }
-
- @Test
- public void startMQAdminToolTest() throws MQClientException {
- WorkerConfig connectConfig = new WorkerConfig();
- connectConfig.setNamesrvAddr(NAME_SERVER_ADDR);
- final DefaultMQAdminExt defaultMQAdminExt = ConnectUtil.startMQAdminTool(connectConfig);
- Assert.assertEquals(NAME_SERVER_ADDR, defaultMQAdminExt.getNamesrvAddr());
- Assert.assertEquals(30000, defaultMQAdminExt.getPollNameServerInterval());
- Assert.assertEquals(30000, defaultMQAdminExt.getHeartbeatBrokerInterval());
- Assert.assertEquals(5000, defaultMQAdminExt.getPersistConsumerOffsetInterval());
- }
-
@Test
public void createTopicTest() {
WorkerConfig connectConfig = new WorkerConfig();
connectConfig.setNamesrvAddr(NAME_SERVER_ADDR);
TopicConfig topicConfig = new TopicConfig();
topicConfig.setTopicName(TEST_TOPIC);
- Assertions.assertThatCode(() -> ConnectUtil.createTopic(connectConfig, topicConfig)).doesNotThrowAnyException();
+ Assertions.assertThatCode(() -> ConnectUtil.maybeCreateTopic(connectConfig, topicConfig)).doesNotThrowAnyException();
final boolean exist = ConnectUtil.isTopicExist(connectConfig, TEST_TOPIC);
Assert.assertTrue(exist);