diff --git a/connectors/rocketmq-connect-activemq/pom.xml b/connectors/rocketmq-connect-activemq/pom.xml
index 738bf7cf..2b5d350f 100644
--- a/connectors/rocketmq-connect-activemq/pom.xml
+++ b/connectors/rocketmq-connect-activemq/pom.xml
@@ -207,7 +207,7 @@
org.apache.rocketmq
rocketmq-openmessaging
- 4.3.2
+ 4.9.4
commons-cli
diff --git a/connectors/rocketmq-connect-cassandra/pom.xml b/connectors/rocketmq-connect-cassandra/pom.xml
index ba30cefe..4071ab7f 100644
--- a/connectors/rocketmq-connect-cassandra/pom.xml
+++ b/connectors/rocketmq-connect-cassandra/pom.xml
@@ -245,7 +245,7 @@
org.apache.rocketmq
rocketmq-openmessaging
- 4.3.2
+ 4.9.4
commons-cli
diff --git a/connectors/rocketmq-connect-debezium/pom.xml b/connectors/rocketmq-connect-debezium/pom.xml
index 4c7d6c5e..eabe0d83 100644
--- a/connectors/rocketmq-connect-debezium/pom.xml
+++ b/connectors/rocketmq-connect-debezium/pom.xml
@@ -180,8 +180,7 @@
1.7.2.Final
42.3.3
- 4.7.1
- 4.3.2
+ 5.1.0
0.1.4
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqAdminUtil.java b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqAdminUtil.java
index c1ae1cbe..25c59534 100644
--- a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqAdminUtil.java
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqAdminUtil.java
@@ -16,6 +16,11 @@
*/
package org.apache.rocketmq.connect.debezium;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
@@ -24,28 +29,22 @@
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.common.admin.TopicOffset;
-import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.common.protocol.body.ClusterInfo;
-import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
-import org.apache.rocketmq.common.protocol.route.BrokerData;
-import org.apache.rocketmq.common.protocol.route.TopicRouteData;
-import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.protocol.admin.TopicOffset;
+import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
+import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
+import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper;
+import org.apache.rocketmq.remoting.protocol.route.BrokerData;
+import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.UUID;
-
/**
* Tools for creating RocketMq topic and group
*/
@@ -130,7 +129,7 @@ public static void createTopic(RocketMqConfig config, TopicConfig topicConfig) {
try {
defaultMQAdminExt = startMQAdminTool(config);
ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
- HashMap> clusterAddrTable = clusterInfo.getClusterAddrTable();
+ Map> clusterAddrTable = clusterInfo.getClusterAddrTable();
Set clusterNameSet = clusterAddrTable.keySet();
for (String clusterName : clusterNameSet) {
Set masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
@@ -201,7 +200,7 @@ public static String createSubGroup(RocketMqConfig connectConfig, String subGrou
SubscriptionGroupConfig initConfig = new SubscriptionGroupConfig();
initConfig.setGroupName(subGroup);
ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
- HashMap> clusterAddrTable = clusterInfo.getClusterAddrTable();
+ Map> clusterAddrTable = clusterInfo.getClusterAddrTable();
Set clusterNameSet = clusterAddrTable.keySet();
for (String clusterName : clusterNameSet) {
Set masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
@@ -231,9 +230,9 @@ public static Map offsets(RocketMqConfig config, Stri
DefaultMQAdminExt adminClient = null;
try {
adminClient = RocketMqAdminUtil.startMQAdminTool(config);
- TopicStatsTable topicStatsTable = adminClient.examineTopicStats(topic);
+ TopicStatsTable topicStatsTable = examineTopicStats(adminClient,topic);
return topicStatsTable.getOffsetTable();
- } catch (MQClientException | MQBrokerException | RemotingException | InterruptedException e) {
+ } catch (Exception e) {
throw new RuntimeException(e);
} finally {
if (adminClient != null) {
@@ -242,4 +241,59 @@ public static Map offsets(RocketMqConfig config, Stri
}
}
+ /**
+ * Compatible with 4.9.4 and earlier
+ *
+ * @param adminClient
+ * @param topic
+ * @return
+ */
+ private static TopicStatsTable examineTopicStats(DefaultMQAdminExt adminClient, String topic) {
+ try {
+ return adminClient.examineTopicStats(topic);
+ } catch (MQBrokerException e) {
+ // Compatible with 4.9.4 and earlier
+ if (e.getResponseCode() == ResponseCode.REQUEST_CODE_NOT_SUPPORTED) {
+ try {
+ return overrideExamineTopicStats(adminClient, topic);
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ } else {
+ throw new RuntimeException(e);
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ /**
+ * Compatible with version 4.9.4
+ *
+ * @param adminClient
+ * @param topic
+ * @return
+ * @throws RemotingException
+ * @throws InterruptedException
+ * @throws MQClientException
+ * @throws MQBrokerException
+ */
+ private static TopicStatsTable overrideExamineTopicStats(DefaultMQAdminExt adminClient,
+ String topic) throws RemotingException, InterruptedException, MQClientException, MQBrokerException {
+ TopicRouteData topicRouteData = adminClient.examineTopicRouteInfo(topic);
+ TopicStatsTable topicStatsTable = new TopicStatsTable();
+ for (BrokerData bd : topicRouteData.getBrokerDatas()) {
+ String addr = bd.selectBrokerAddr();
+ if (addr != null) {
+ TopicStatsTable tst = adminClient
+ .getDefaultMQAdminExtImpl()
+ .getMqClientInstance()
+ .getMQClientAPIImpl()
+ .getTopicStatsInfo(addr, topic, 5000);
+ topicStatsTable.getOffsetTable().putAll(tst.getOffsetTable());
+ }
+ }
+ return topicStatsTable;
+ }
+
}
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqDatabaseHistory.java b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqDatabaseHistory.java
index 7fd7941e..54f8458b 100644
--- a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqDatabaseHistory.java
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqDatabaseHistory.java
@@ -34,12 +34,12 @@
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.common.admin.TopicOffset;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.connect.kafka.connect.adaptor.task.AbstractKafkaConnectSource;
import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.protocol.admin.TopicOffset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/connectors/rocketmq-connect-deltalake/pom.xml b/connectors/rocketmq-connect-deltalake/pom.xml
index 245395ab..1333c343 100644
--- a/connectors/rocketmq-connect-deltalake/pom.xml
+++ b/connectors/rocketmq-connect-deltalake/pom.xml
@@ -231,7 +231,7 @@
org.apache.rocketmq
rocketmq-openmessaging
- 4.3.2
+ 4.9.4
diff --git a/connectors/rocketmq-connect-hudi/README.md b/connectors/rocketmq-connect-hudi/README.md
index ca73a68f..5a96edd0 100644
--- a/connectors/rocketmq-connect-hudi/README.md
+++ b/connectors/rocketmq-connect-hudi/README.md
@@ -72,7 +72,7 @@ http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-hudi-connector-name}/
* **spark-submit 启动任务**
将connect-runtime打包后通过spark-submit提交任务
```
-nohup sh spark-submit --class org.apache.rocketmq.connect.runtime.DistributedConnectStartup --conf "spark.driver.extraJavaOptions=-Dlogback.configurationFile=logback.xml" --files /xxx/conf/connect.conf,/xxx/conf/log4j.properties --packages org.apache.hudi:hudi-spark3-bundle_2.12:0.8.0,org.apache.spark:spark-avro_2.12:3.0.1,org.apache.hudi:hudi-java-client:0.8.0,org.apache.parquet:parquet-avro:1.10.1,org.apache.avro:avro:1.10.2,com.alibaba:fastjson:1.2.51,org.reflections:reflections:0.9.11,org.apache.httpcomponents:httpclient:4.5.5,io.openmessaging:openmessaging-connector:0.1.1,commons-cli:commons-cli:1.1,org.apache.rocketmq:rocketmq-client:4.4.0,org.apache.rocketmq:rocketmq-tools:4.4.0,org.apache.rocketmq:rocketmq-remoting:4.4.0,org.apache.rocketmq:rocketmq-openmessaging:4.3.2,org.slf4j:slf4j-api:1.7.7,com.google.guava:guava:20.0,org.apache.hadoop:hadoop-common:3.3.1,org.reflections:reflections:0.9.12,org.apache.hive:hive-exec:2.3.7 --conf 'spark.executor.userClassPathFirst=true' --conf 'spark.driver.userClassPathFirst=true' --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' /xxx/rocketmq/rocketmq-connect-runtime-0.0.1-SNAPSHOT.jar &
+nohup sh spark-submit --class org.apache.rocketmq.connect.runtime.DistributedConnectStartup --conf "spark.driver.extraJavaOptions=-Dlogback.configurationFile=logback.xml" --files /xxx/conf/connect.conf,/xxx/conf/log4j.properties --packages org.apache.hudi:hudi-spark3-bundle_2.12:0.8.0,org.apache.spark:spark-avro_2.12:3.0.1,org.apache.hudi:hudi-java-client:0.8.0,org.apache.parquet:parquet-avro:1.10.1,org.apache.avro:avro:1.10.2,com.alibaba:fastjson:1.2.51,org.reflections:reflections:0.9.11,org.apache.httpcomponents:httpclient:4.5.5,io.openmessaging:openmessaging-connector:0.1.1,commons-cli:commons-cli:1.1,org.apache.rocketmq:rocketmq-client:4.4.0,org.apache.rocketmq:rocketmq-tools:4.4.0,org.apache.rocketmq:rocketmq-remoting:4.4.0,org.apache.rocketmq:rocketmq-openmessaging:4.9.4,org.slf4j:slf4j-api:1.7.7,com.google.guava:guava:20.0,org.apache.hadoop:hadoop-common:3.3.1,org.reflections:reflections:0.9.12,org.apache.hive:hive-exec:2.3.7 --conf 'spark.executor.userClassPathFirst=true' --conf 'spark.driver.userClassPathFirst=true' --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' /xxx/rocketmq/rocketmq-connect-runtime-0.0.1-SNAPSHOT.jar &
```
后续操作参考rocketmq-connect-hudi启动步骤
diff --git a/connectors/rocketmq-connect-hudi/pom.xml b/connectors/rocketmq-connect-hudi/pom.xml
index a8d03d12..b5bddecd 100644
--- a/connectors/rocketmq-connect-hudi/pom.xml
+++ b/connectors/rocketmq-connect-hudi/pom.xml
@@ -15,7 +15,7 @@
1.8
1.8
- 4.7.1
+ 4.9.4
0.8.0
1.10.2
@@ -231,7 +231,7 @@
org.apache.rocketmq
rocketmq-openmessaging
- 4.3.2
+ 4.9.4
org.apache.hudi
diff --git a/connectors/rocketmq-connect-jdbc/pom.xml b/connectors/rocketmq-connect-jdbc/pom.xml
index 1d7db045..d7635e33 100644
--- a/connectors/rocketmq-connect-jdbc/pom.xml
+++ b/connectors/rocketmq-connect-jdbc/pom.xml
@@ -46,7 +46,7 @@
1.8
1.8
- 4.7.1
+ 4.9.4
4.13.1
diff --git a/connectors/rocketmq-connect-jms/pom.xml b/connectors/rocketmq-connect-jms/pom.xml
index 329bbf00..3b3967a4 100644
--- a/connectors/rocketmq-connect-jms/pom.xml
+++ b/connectors/rocketmq-connect-jms/pom.xml
@@ -188,7 +188,7 @@
org.apache.rocketmq
rocketmq-openmessaging
- 4.3.2
+ 4.9.4
commons-cli
diff --git a/connectors/rocketmq-connect-kafka/pom.xml b/connectors/rocketmq-connect-kafka/pom.xml
index 2d2e0c2d..83af208c 100644
--- a/connectors/rocketmq-connect-kafka/pom.xml
+++ b/connectors/rocketmq-connect-kafka/pom.xml
@@ -179,7 +179,7 @@
org.apache.rocketmq
rocketmq-openmessaging
- 4.3.2
+ 4.9.4
com.alibaba
diff --git a/connectors/rocketmq-connect-mongo/pom.xml b/connectors/rocketmq-connect-mongo/pom.xml
index 9f9c1be5..c94ace67 100644
--- a/connectors/rocketmq-connect-mongo/pom.xml
+++ b/connectors/rocketmq-connect-mongo/pom.xml
@@ -196,7 +196,7 @@
org.apache.rocketmq
rocketmq-openmessaging
- 4.3.2
+ 4.9.4
junit
diff --git a/connectors/rocketmq-connect-mqtt/pom.xml b/connectors/rocketmq-connect-mqtt/pom.xml
index 69003ebe..8ccd0ffb 100644
--- a/connectors/rocketmq-connect-mqtt/pom.xml
+++ b/connectors/rocketmq-connect-mqtt/pom.xml
@@ -32,7 +32,7 @@
1.8
1.8
- 4.7.1
+ 4.9.4
diff --git a/connectors/rocketmq-connect-rabbitmq/pom.xml b/connectors/rocketmq-connect-rabbitmq/pom.xml
index 64b7a40d..d53592d0 100644
--- a/connectors/rocketmq-connect-rabbitmq/pom.xml
+++ b/connectors/rocketmq-connect-rabbitmq/pom.xml
@@ -206,7 +206,7 @@
org.apache.rocketmq
rocketmq-openmessaging
- 4.3.2
+ 4.9.4
commons-cli
diff --git a/connectors/rocketmq-replicator/pom.xml b/connectors/rocketmq-replicator/pom.xml
index 15b71653..1ef16e7d 100644
--- a/connectors/rocketmq-replicator/pom.xml
+++ b/connectors/rocketmq-replicator/pom.xml
@@ -67,7 +67,7 @@
- 4.7.1
+ 4.9.4
0.1.3
4.13.1
3.2.4
diff --git a/metric-exporter/src/main/java/org/apache/rocketmq/connect/metrics/reporter/RocketMQClientUtil.java b/metric-exporter/src/main/java/org/apache/rocketmq/connect/metrics/reporter/RocketMQClientUtil.java
index e3587d69..114bd396 100644
--- a/metric-exporter/src/main/java/org/apache/rocketmq/connect/metrics/reporter/RocketMQClientUtil.java
+++ b/metric-exporter/src/main/java/org/apache/rocketmq/connect/metrics/reporter/RocketMQClientUtil.java
@@ -17,26 +17,25 @@
package org.apache.rocketmq.connect.metrics.reporter;
-import com.beust.jcommander.internal.Sets;
+import com.google.common.collect.Sets;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.common.protocol.body.ClusterInfo;
-import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
-import org.apache.rocketmq.common.protocol.route.BrokerData;
-import org.apache.rocketmq.common.protocol.route.TopicRouteData;
-import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
+import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper;
+import org.apache.rocketmq.remoting.protocol.route.BrokerData;
+import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
-import java.util.HashMap;
-import java.util.Set;
-import java.util.UUID;
-
/**
* rocket connect util
@@ -59,7 +58,7 @@ public static DefaultMQProducer initDefaultMQProducer(boolean aclEnabled,
String namesrvAddr) {
RPCHook rpcHook = null;
if (aclEnabled) {
- rpcHook = new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
+ rpcHook = getAclRPCHook(accessKey, secretKey);
}
DefaultMQProducer producer = new DefaultMQProducer(rpcHook);
producer.setNamesrvAddr(namesrvAddr);
@@ -94,7 +93,7 @@ public static void createTopic(DefaultMQAdminExt defaultMQAdminExt,
TopicConfig topicConfig) {
try {
ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
- HashMap> clusterAddrTable = clusterInfo.getClusterAddrTable();
+ Map> clusterAddrTable = clusterInfo.getClusterAddrTable();
Set clusterNameSet = clusterAddrTable.keySet();
for (String clusterName : clusterNameSet) {
Set masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
@@ -140,7 +139,7 @@ public static String createSubGroup(DefaultMQAdminExt defaultMQAdminExt, String
SubscriptionGroupConfig initConfig = new SubscriptionGroupConfig();
initConfig.setGroupName(subGroup);
ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
- HashMap> clusterAddrTable = clusterInfo.getClusterAddrTable();
+ Map> clusterAddrTable = clusterInfo.getClusterAddrTable();
Set clusterNameSet = clusterAddrTable.keySet();
for (String clusterName : clusterNameSet) {
Set masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
diff --git a/metric-exporter/src/main/java/org/apache/rocketmq/connect/metrics/reporter/RocketMQScheduledReporter.java b/metric-exporter/src/main/java/org/apache/rocketmq/connect/metrics/reporter/RocketMQScheduledReporter.java
index bc504c44..29229d99 100644
--- a/metric-exporter/src/main/java/org/apache/rocketmq/connect/metrics/reporter/RocketMQScheduledReporter.java
+++ b/metric-exporter/src/main/java/org/apache/rocketmq/connect/metrics/reporter/RocketMQScheduledReporter.java
@@ -16,9 +16,12 @@
*/
package org.apache.rocketmq.connect.metrics.reporter;
-
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.Message;
@@ -28,11 +31,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.nio.charset.StandardCharsets;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.concurrent.TimeUnit;
-
/**
* rocketmq exporter
*/
@@ -160,7 +158,9 @@ private void send(MetricName name, Double value) {
message.setTopic(this.topic);
message.setKeys(name.getStr());
message.setBody(value.toString().getBytes(StandardCharsets.UTF_8));
- producer.send(message);
+ if (producer != null) {
+ producer.send(message);
+ }
} catch (Exception e) {
log.error("Send metrics error", e);
}
@@ -169,6 +169,8 @@ private void send(MetricName name, Double value) {
@Override
public void close() {
super.close();
- producer.shutdown();
+ if (producer != null) {
+ producer.shutdown();
+ }
}
}
diff --git a/pom.xml b/pom.xml
index 2429e8b1..3a91a0a0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -41,7 +41,6 @@
- 4.7.1
4.13.1
3.22.0
4.7.0
@@ -57,7 +56,7 @@
2.13.4.1
4.4
- 4.7.1
+ 5.1.0
3.8.1
UTF-8
diff --git a/rocketmq-connect-runtime/pom.xml b/rocketmq-connect-runtime/pom.xml
index 82bd7a61..4ed44af4 100644
--- a/rocketmq-connect-runtime/pom.xml
+++ b/rocketmq-connect-runtime/pom.xml
@@ -42,9 +42,6 @@
1.8
1.8
-
- 4.7.1
-
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractStateManagementService.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractStateManagementService.java
index 153e5379..edad9f18 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractStateManagementService.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractStateManagementService.java
@@ -21,33 +21,29 @@
import io.openmessaging.connector.api.data.SchemaAndValue;
import io.openmessaging.connector.api.data.SchemaBuilder;
import io.openmessaging.connector.api.data.Struct;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import org.apache.rocketmq.connect.runtime.common.ConnAndTaskStatus;
import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.connect.runtime.config.WorkerConfig;
import org.apache.rocketmq.connect.runtime.connectorwrapper.status.AbstractStatus;
import org.apache.rocketmq.connect.runtime.connectorwrapper.status.ConnectorStatus;
import org.apache.rocketmq.connect.runtime.connectorwrapper.status.TaskStatus;
-import org.apache.rocketmq.connect.runtime.serialization.Serdes;
import org.apache.rocketmq.connect.runtime.store.KeyValueStore;
import org.apache.rocketmq.connect.runtime.utils.Callback;
-import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId;
import org.apache.rocketmq.connect.runtime.utils.Utils;
-import org.apache.rocketmq.connect.runtime.utils.datasync.BrokerBasedLog;
import org.apache.rocketmq.connect.runtime.utils.datasync.DataSynchronizer;
import org.apache.rocketmq.connect.runtime.utils.datasync.DataSynchronizerCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
/**
* State management service
*/
@@ -100,15 +96,7 @@ public void initialize(WorkerConfig config, RecordConverter converter) {
this.converter.configure(new HashMap<>());
this.statusTopic = config.getConnectStatusTopic();
this.dataSynchronizer = initializationDataSynchronizer(config);
-
- new BrokerBasedLog(config,
- this.statusTopic,
- ConnectUtil.createGroupName(statusManagePrefix, config.getWorkerId()),
- new StatusChangeCallback(),
- Serdes.serdeFrom(String.class),
- Serdes.serdeFrom(byte[].class),
- enabledCompactTopic()
- );
+
}
@Override
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementServiceImpl.java
index 95be089a..6aeca0ee 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementServiceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementServiceImpl.java
@@ -22,14 +22,14 @@
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.common.protocol.RequestCode;
-import org.apache.rocketmq.common.protocol.header.NotifyConsumerIdsChangedRequestHeader;
import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.connect.runtime.config.WorkerConfig;
import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RequestCode;
+import org.apache.rocketmq.remoting.protocol.header.NotifyConsumerIdsChangedRequestHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -161,7 +161,7 @@ public RemotingCommand workerChanged(ChannelHandlerContext ctx,
workerChangeListener.onWorkerChange();
}
} catch (Exception e) {
- log.error("NotifyConsumerIdsChanged for connect exception", RemotingHelper.exceptionSimpleDesc(e));
+ log.error("NotifyConsumerIdsChanged for connect exception", e);
}
return null;
}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/stats/ConnectStatsManager.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/stats/ConnectStatsManager.java
index 854e6a83..e4450749 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/stats/ConnectStatsManager.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/stats/ConnectStatsManager.java
@@ -18,17 +18,16 @@
package org.apache.rocketmq.connect.runtime.stats;
import org.apache.commons.lang3.StringUtils;
-import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.stats.StatsItem;
import org.apache.rocketmq.common.stats.StatsItemSet;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.connect.runtime.config.WorkerConfig;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import static org.apache.rocketmq.connect.runtime.common.LoggerName.ROCKETMQ_CONNECT_STATS;
@@ -76,11 +75,8 @@ public class ConnectStatsManager {
/**
* read disk follow stats
*/
- private static final InternalLogger log = InternalLoggerFactory.getLogger(ROCKETMQ_CONNECT_STATS);
+ private static final Logger log = LoggerFactory.getLogger(ROCKETMQ_CONNECT_STATS);
-
- private static final InternalLogger COMMERCIAL_LOG = InternalLoggerFactory.getLogger(
- LoggerName.COMMERCIAL_LOGGER_NAME);
private final ScheduledExecutorService scheduledExecutorService =
ThreadUtils.newSingleThreadScheduledExecutor("ConnectStatsThread", true);
private final ScheduledExecutorService commercialExecutor =
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java
index d65849a3..e5321fe3 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java
@@ -17,10 +17,19 @@
package org.apache.rocketmq.connect.runtime.utils;
-import com.beust.jcommander.internal.Sets;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import io.openmessaging.connector.api.data.RecordOffset;
import io.openmessaging.connector.api.data.RecordPartition;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
@@ -30,54 +39,45 @@
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.KeyBuilder;
+import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
-import org.apache.rocketmq.common.admin.ConsumeStats;
-import org.apache.rocketmq.common.admin.OffsetWrapper;
-import org.apache.rocketmq.common.admin.TopicOffset;
-import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.common.protocol.ResponseCode;
-import org.apache.rocketmq.common.protocol.body.ClusterInfo;
-import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
-import org.apache.rocketmq.common.protocol.route.BrokerData;
-import org.apache.rocketmq.common.protocol.route.TopicRouteData;
-import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
-import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
+import org.apache.rocketmq.common.utils.NetworkUtil;
+import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.connect.runtime.config.ConnectorConfig;
import org.apache.rocketmq.connect.runtime.config.WorkerConfig;
import org.apache.rocketmq.connect.runtime.service.strategy.AllocateConnAndTaskStrategy;
import org.apache.rocketmq.remoting.RPCHook;
-import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
+import org.apache.rocketmq.remoting.protocol.admin.OffsetWrapper;
+import org.apache.rocketmq.remoting.protocol.admin.TopicOffset;
+import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
+import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
+import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper;
+import org.apache.rocketmq.remoting.protocol.route.BrokerData;
+import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerSinkTask.QUEUE_OFFSET;
public class ConnectUtil {
-
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
public static final String SYS_TASK_CG_PREFIX = "connect-";
- private final static AtomicLong GROUP_POSTFIX_ID = new AtomicLong(0);
public static String createGroupName(String prefix) {
StringBuilder sb = new StringBuilder();
sb.append(prefix).append("-");
- sb.append(RemotingUtil.getLocalAddress()).append("-");
+ sb.append(NetworkUtil.getLocalAddress()).append("-");
sb.append(UtilAll.getPid()).append("-");
sb.append(System.nanoTime());
return sb.toString().replace(".", "-");
@@ -177,7 +177,7 @@ public static void createTopic(WorkerConfig connectConfig, TopicConfig topicConf
try {
defaultMQAdminExt = startMQAdminTool(connectConfig);
ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
- HashMap> clusterAddrTable = clusterInfo.getClusterAddrTable();
+ Map> clusterAddrTable = clusterInfo.getClusterAddrTable();
Set clusterNameSet = clusterAddrTable.keySet();
for (String clusterName : clusterNameSet) {
Set masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
@@ -248,7 +248,7 @@ public static String createSubGroup(WorkerConfig connectConfig, String subGroup)
SubscriptionGroupConfig initConfig = new SubscriptionGroupConfig();
initConfig.setGroupName(subGroup);
ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
- HashMap> clusterAddrTable = clusterInfo.getClusterAddrTable();
+ Map> clusterAddrTable = clusterInfo.getClusterAddrTable();
Set clusterNameSet = clusterAddrTable.keySet();
for (String clusterName : clusterNameSet) {
Set masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
@@ -347,26 +347,6 @@ private static RPCHook getAclRPCHook(String accessKey, String secretKey) {
return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
}
- public static DefaultMQPullConsumer initDefaultMQPullConsumer(WorkerConfig connectConfig, ConnectorTaskId id, ConnectKeyValue keyValue) {
- RPCHook rpcHook = null;
- if (connectConfig.getAclEnable()) {
- rpcHook = new AclClientRPCHook(new SessionCredentials(connectConfig.getAccessKey(), connectConfig.getSecretKey()));
- }
- DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(rpcHook);
- consumer.setInstanceName(id.toString());
- String taskGroupId = keyValue.getString("task-group-id");
- if (StringUtils.isNotBlank(taskGroupId)) {
- consumer.setConsumerGroup(taskGroupId);
- } else {
- consumer.setConsumerGroup(SYS_TASK_CG_PREFIX + id.connector());
- }
- if (StringUtils.isNotBlank(connectConfig.getNamesrvAddr())) {
- consumer.setNamesrvAddr(connectConfig.getNamesrvAddr());
- }
- return consumer;
- }
-
-
/**
* Get topic offsets
*/
@@ -377,14 +357,11 @@ public static Map> offsetTopics(
try {
adminClient = startMQAdminTool(config);
for (String topic : topics) {
- TopicStatsTable topicStatsTable = adminClient.examineTopicStats(topic);
+ TopicStatsTable topicStatsTable = examineTopicStats(adminClient, topic);
offsets.put(topic, topicStatsTable.getOffsetTable());
}
return offsets;
- } catch (MQClientException
- | MQBrokerException
- | RemotingException
- | InterruptedException e) {
+ } catch (Exception e) {
throw new RuntimeException(e);
} finally {
if (adminClient != null) {
@@ -393,40 +370,6 @@ public static Map> offsetTopics(
}
}
- /** Flat topics offsets */
- public static Map flatOffsetTopics(
- WorkerConfig config, List topics) {
- Map messageQueueTopicOffsets = Maps.newConcurrentMap();
- offsetTopics(config, topics).values()
- .forEach(
- offsetTopic -> {
- messageQueueTopicOffsets.putAll(offsetTopic);
- });
- return messageQueueTopicOffsets;
- }
-
- /** Search offsets by timestamp */
- public static Map searchOffsetsByTimestamp(
- WorkerConfig config,
- Collection messageQueues,
- Long timestamp) {
- Map offsets = Maps.newConcurrentMap();
- DefaultMQAdminExt adminClient = null;
- try {
- adminClient = startMQAdminTool(config);
- for (MessageQueue messageQueue : messageQueues) {
- long offset = adminClient.searchOffset(messageQueue, timestamp);
- offsets.put(messageQueue, offset);
- }
- return offsets;
- } catch (MQClientException e) {
- throw new RuntimeException(e);
- } finally {
- if (adminClient != null) {
- adminClient.shutdown();
- }
- }
- }
/** Get consumer group offset */
public static Map currentOffsets(WorkerConfig config, String groupName, List topics, Set messageQueues) {
@@ -436,7 +379,7 @@ public static Map currentOffsets(WorkerConfig config, String
adminClient = startMQAdminTool(config);
Map consumerOffsets = Maps.newConcurrentMap();
for (String topic : topics) {
- ConsumeStats consumeStats = adminClient.examineConsumeStats(groupName, topic);
+ ConsumeStats consumeStats = examineConsumeStats(adminClient, groupName, topic);
consumerOffsets.putAll(consumeStats.getOffsetTable());
}
return consumerOffsets.keySet().stream()
@@ -446,12 +389,9 @@ public static Map currentOffsets(WorkerConfig config, String
messageQueue -> messageQueue,
messageQueue ->
consumerOffsets.get(messageQueue).getConsumerOffset()));
- } catch (MQClientException
- | MQBrokerException
- | RemotingException
- | InterruptedException e) {
+ } catch (MQClientException e) {
if (e instanceof MQClientException) {
- if (((MQClientException) e).getResponseCode() == ResponseCode.TOPIC_NOT_EXIST) {
+ if (e.getResponseCode() == ResponseCode.TOPIC_NOT_EXIST) {
return Collections.emptyMap();
} else {
throw new RuntimeException(e);
@@ -466,4 +406,161 @@ public static Map currentOffsets(WorkerConfig config, String
}
}
+ /**
+ * Compatible with 4.9.4 and earlier
+ *
+ * @param adminClient
+ * @param topic
+ * @return
+ */
+ private static TopicStatsTable examineTopicStats(DefaultMQAdminExt adminClient, String topic) {
+ try {
+ return adminClient.examineTopicStats(topic);
+ } catch (MQBrokerException e) {
+ // Compatible with 4.9.4 and earlier
+ if (e.getResponseCode() == ResponseCode.REQUEST_CODE_NOT_SUPPORTED) {
+ try {
+ log.warn("Examine topic stats failure , the server version is less than 5.1.0, and downward compatibility begins, {}", e.getErrorMessage());
+ return overrideExamineTopicStats(adminClient, topic);
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ } else {
+ throw new RuntimeException(e);
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ /**
+ * examineConsumeStats
+ * Compatible with 4.9.4 and earlier
+ *
+ * @param adminClient
+ * @param topic
+ * @return
+ */
+ private static ConsumeStats examineConsumeStats(DefaultMQAdminExt adminClient, String groupName, String topic) {
+ try {
+ return adminClient.examineConsumeStats(groupName, topic);
+ } catch (MQBrokerException e) {
+ // Compatible with 4.9.4 and earlier
+ if (e.getResponseCode() == ResponseCode.REQUEST_CODE_NOT_SUPPORTED) {
+ try {
+ log.warn("Examine consume stats failure, the server version is less than 5.1.0, and downward compatibility begins {}", e.getErrorMessage());
+ return overrideExamineConsumeStats(adminClient, groupName, topic);
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ } else {
+ throw new RuntimeException(e);
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ /**
+ * Compatible with version 4.9.4
+ *
+ * @param adminClient
+ * @param topic
+ * @return
+ * @throws RemotingException
+ * @throws InterruptedException
+ * @throws MQClientException
+ * @throws MQBrokerException
+ */
+ private static TopicStatsTable overrideExamineTopicStats(DefaultMQAdminExt adminClient,
+ String topic) throws RemotingException, InterruptedException, MQClientException, MQBrokerException {
+ TopicRouteData topicRouteData = adminClient.examineTopicRouteInfo(topic);
+ TopicStatsTable topicStatsTable = new TopicStatsTable();
+ for (BrokerData bd : topicRouteData.getBrokerDatas()) {
+ String addr = bd.selectBrokerAddr();
+ if (addr != null) {
+ TopicStatsTable tst = adminClient
+ .getDefaultMQAdminExtImpl()
+ .getMqClientInstance()
+ .getMQClientAPIImpl()
+ .getTopicStatsInfo(addr, topic, 5000);
+ topicStatsTable.getOffsetTable().putAll(tst.getOffsetTable());
+ }
+ }
+ return topicStatsTable;
+ }
+
+ /**
+ * Compatible with version 4.9.4
+ *
+ * @param adminExt
+ * @param groupName
+ * @param topic
+ * @return
+ * @throws MQClientException
+ * @throws RemotingException
+ * @throws InterruptedException
+ * @throws MQBrokerException
+ */
+ private static ConsumeStats overrideExamineConsumeStats(DefaultMQAdminExt adminExt, String groupName,
+ String topic) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
+ TopicRouteData topicRouteData = null;
+ List routeTopics = new ArrayList<>();
+ routeTopics.add(MixAll.getRetryTopic(groupName));
+ if (topic != null) {
+ routeTopics.add(topic);
+ routeTopics.add(KeyBuilder.buildPopRetryTopic(topic, groupName));
+ }
+ for (int i = 0; i < routeTopics.size(); i++) {
+ try {
+ topicRouteData = adminExt.getDefaultMQAdminExtImpl().examineTopicRouteInfo(routeTopics.get(i));
+ if (topicRouteData != null) {
+ break;
+ }
+ } catch (Throwable e) {
+ if (i == routeTopics.size() - 1) {
+ throw e;
+ }
+ }
+ }
+ ConsumeStats result = new ConsumeStats();
+
+ for (BrokerData bd : topicRouteData.getBrokerDatas()) {
+ String addr = bd.selectBrokerAddr();
+ if (addr != null) {
+ ConsumeStats consumeStats = adminExt.getDefaultMQAdminExtImpl().getMqClientInstance().getMQClientAPIImpl().getConsumeStats(addr, groupName, topic, 5000 * 3);
+ result.getOffsetTable().putAll(consumeStats.getOffsetTable());
+ double value = result.getConsumeTps() + consumeStats.getConsumeTps();
+ result.setConsumeTps(value);
+ }
+ }
+
+ Set topics = Sets.newHashSet();
+ for (MessageQueue messageQueue : result.getOffsetTable().keySet()) {
+ topics.add(messageQueue.getTopic());
+ }
+
+ ConsumeStats staticResult = new ConsumeStats();
+ staticResult.setConsumeTps(result.getConsumeTps());
+
+ for (String currentTopic : topics) {
+ TopicRouteData currentRoute = adminExt.getDefaultMQAdminExtImpl().examineTopicRouteInfo(currentTopic);
+ if (currentRoute.getTopicQueueMappingByBroker() == null
+ || currentRoute.getTopicQueueMappingByBroker().isEmpty()) {
+ //normal topic
+ for (Map.Entry entry : result.getOffsetTable().entrySet()) {
+ if (entry.getKey().getTopic().equals(currentTopic)) {
+ staticResult.getOffsetTable().put(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+ }
+
+ if (staticResult.getOffsetTable().isEmpty()) {
+ throw new MQClientException(ResponseCode.CONSUMER_NOT_ONLINE, "Not found the consumer group consume stats, because return offset table is empty, maybe the consumer not consume any message");
+ }
+
+ return staticResult;
+ }
+
}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/datasync/BrokerBasedLog.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/datasync/BrokerBasedLog.java
index 258adeeb..01eda152 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/datasync/BrokerBasedLog.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/datasync/BrokerBasedLog.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.connect.runtime.utils.datasync;
+import com.google.common.collect.Maps;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -34,7 +35,6 @@
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash;
import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.common.admin.TopicOffset;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
@@ -46,6 +46,7 @@
import org.apache.rocketmq.connect.runtime.utils.Callback;
import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.protocol.admin.TopicOffset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -137,6 +138,11 @@ private void prepare() {
if (!ConnectUtil.isTopicExist(workerConfig, topicName)) {
log.info("Try to create store topic: {}!", topicName);
TopicConfig topicConfig = new TopicConfig(topicName, 1, 1, PermName.PERM_READ | PermName.PERM_WRITE);
+ if (enabledCompactTopic) {
+ Map attributes = Maps.newConcurrentMap();
+ attributes.put("+cleanup.policy", "COMPACTION");
+ topicConfig.setAttributes(attributes);
+ }
ConnectUtil.createTopic(workerConfig, topicConfig);
}
}
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/ServerResponseMocker.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/ServerResponseMocker.java
index 16f3ad40..db2fb2aa 100644
--- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/ServerResponseMocker.java
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/ServerResponseMocker.java
@@ -30,10 +30,7 @@
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.Future;
-import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.protocol.RequestCode;
-import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
@@ -42,17 +39,20 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
-import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseBody;
-import org.apache.rocketmq.common.protocol.route.BrokerData;
import java.util.concurrent.ConcurrentHashMap;
-import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
-import org.apache.rocketmq.common.protocol.route.QueueData;
-import org.apache.rocketmq.common.protocol.route.TopicRouteData;
-import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.remoting.netty.NettyDecoder;
import org.apache.rocketmq.remoting.netty.NettyEncoder;
+import org.apache.rocketmq.remoting.protocol.DataVersion;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode;
+import org.apache.rocketmq.remoting.protocol.RequestCode;
+import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
+import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper;
+import org.apache.rocketmq.remoting.protocol.header.GetConsumerListByGroupResponseBody;
+import org.apache.rocketmq.remoting.protocol.route.BrokerData;
+import org.apache.rocketmq.remoting.protocol.route.QueueData;
+import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.junit.After;
import org.junit.Before;
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
index 5570be21..01ac4735 100644
--- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
@@ -17,9 +17,20 @@
package org.apache.rocketmq.connect.runtime.connectorwrapper;
+import com.google.common.collect.Sets;
import io.openmessaging.connector.api.component.connector.ConnectorContext;
import io.openmessaging.connector.api.data.ConnectRecord;
import io.openmessaging.internal.DefaultKeyValue;
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
import org.apache.rocketmq.connect.runtime.config.ConnectorConfig;
@@ -45,28 +56,21 @@
import org.apache.rocketmq.connect.runtime.service.local.LocalStateManagementServiceImpl;
import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
+import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId;
import org.apache.rocketmq.connect.runtime.utils.TestUtils;
+import org.apache.rocketmq.connect.runtime.utils.datasync.DataSynchronizer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
+import org.mockito.MockedStatic;
import org.mockito.junit.MockitoJUnitRunner;
-import java.io.File;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
@@ -75,9 +79,6 @@ public class WorkerTest {
@Mock
private PositionManagementService positionManagementService;
- @Mock
- private PositionManagementService offsetManagementService;
-
@Mock
private ConfigManagementService configManagementService;
@@ -111,12 +112,18 @@ public class WorkerTest {
private WrapperStatusListener wrapperStatusListener;
+ @Mock
private StateManagementService stateManagementService;
private ServerResponseMocker nameServerMocker;
private ServerResponseMocker brokerMocker;
+ @Mock
+ protected DataSynchronizer dataSynchronizer;
+
+ private final MockedStatic connectUtil = mockStatic(ConnectUtil.class);
+
@Before
public void init() {
nameServerMocker = NameServerMocker.startByDefaultConf(9876, 10911);
@@ -125,6 +132,7 @@ public void init() {
when(plugin.newConnector(any())).thenReturn(new TestConnector());
when(plugin.delegatingLoader()).thenReturn(delegatingClassLoader);
when(delegatingClassLoader.pluginClassLoader(any())).thenReturn(pluginClassLoader);
+ connectUtil.when(() -> ConnectUtil.fetchAllConsumerGroupList(connectConfig)).thenReturn(Sets.newHashSet());
Thread.currentThread().setContextClassLoader(pluginClassLoader);
connectConfig = new WorkerConfig();
@@ -179,6 +187,7 @@ public void init() {
@After
public void destroy() throws InterruptedException {
+ connectUtil.close();
TimeUnit.SECONDS.sleep(2);
worker.stop();
TestUtils.deleteFile(new File(System.getProperty("user.home") + File.separator + "testConnectorStore"));