Skip to content

Commit

Permalink
Adding the COMPACTION attribute is supported when creating a topic in…
Browse files Browse the repository at this point in the history
… the BrokerBasedLog #436
  • Loading branch information
sunxiaojian committed Mar 7, 2023
1 parent 6ffc39e commit 47f0aac
Show file tree
Hide file tree
Showing 26 changed files with 356 additions and 210 deletions.
2 changes: 1 addition & 1 deletion connectors/rocketmq-connect-activemq/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-openmessaging</artifactId>
<version>4.3.2</version>
<version>4.9.4</version>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
Expand Down
2 changes: 1 addition & 1 deletion connectors/rocketmq-connect-cassandra/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-openmessaging</artifactId>
<version>4.3.2</version>
<version>4.9.4</version>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
Expand Down
3 changes: 1 addition & 2 deletions connectors/rocketmq-connect-debezium/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,7 @@
<debezium.version>1.7.2.Final</debezium.version>
<debezium.postgresql.version>42.3.3</debezium.postgresql.version>
<!--rocketmq version-->
<rocketmq.version>4.7.1</rocketmq.version>
<rocketmq-openmessaging.version>4.3.2</rocketmq-openmessaging.version>
<rocketmq.version>5.1.0</rocketmq.version>

<!--rocketmq connect version-->
<openmessaging-connector.version>0.1.4</openmessaging-connector.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@
*/
package org.apache.rocketmq.connect.debezium;

import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
Expand All @@ -24,28 +29,22 @@
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.admin.TopicOffset;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.admin.TopicOffset;
import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;

/**
* Tools for creating RocketMq topic and group
*/
Expand Down Expand Up @@ -130,7 +129,7 @@ public static void createTopic(RocketMqConfig config, TopicConfig topicConfig) {
try {
defaultMQAdminExt = startMQAdminTool(config);
ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
HashMap<String, Set<String>> clusterAddrTable = clusterInfo.getClusterAddrTable();
Map<String, Set<String>> clusterAddrTable = clusterInfo.getClusterAddrTable();
Set<String> clusterNameSet = clusterAddrTable.keySet();
for (String clusterName : clusterNameSet) {
Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
Expand Down Expand Up @@ -201,7 +200,7 @@ public static String createSubGroup(RocketMqConfig connectConfig, String subGrou
SubscriptionGroupConfig initConfig = new SubscriptionGroupConfig();
initConfig.setGroupName(subGroup);
ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
HashMap<String, Set<String>> clusterAddrTable = clusterInfo.getClusterAddrTable();
Map<String, Set<String>> clusterAddrTable = clusterInfo.getClusterAddrTable();
Set<String> clusterNameSet = clusterAddrTable.keySet();
for (String clusterName : clusterNameSet) {
Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
Expand Down Expand Up @@ -231,9 +230,9 @@ public static Map<MessageQueue, TopicOffset> offsets(RocketMqConfig config, Stri
DefaultMQAdminExt adminClient = null;
try {
adminClient = RocketMqAdminUtil.startMQAdminTool(config);
TopicStatsTable topicStatsTable = adminClient.examineTopicStats(topic);
TopicStatsTable topicStatsTable = examineTopicStats(adminClient,topic);
return topicStatsTable.getOffsetTable();
} catch (MQClientException | MQBrokerException | RemotingException | InterruptedException e) {
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
if (adminClient != null) {
Expand All @@ -242,4 +241,59 @@ public static Map<MessageQueue, TopicOffset> offsets(RocketMqConfig config, Stri
}
}

/**
* Compatible with 4.9.4 and earlier
*
* @param adminClient
* @param topic
* @return
*/
private static TopicStatsTable examineTopicStats(DefaultMQAdminExt adminClient, String topic) {
try {
return adminClient.examineTopicStats(topic);
} catch (MQBrokerException e) {
// Compatible with 4.9.4 and earlier
if (e.getResponseCode() == ResponseCode.REQUEST_CODE_NOT_SUPPORTED) {
try {
return overrideExamineTopicStats(adminClient, topic);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
} else {
throw new RuntimeException(e);
}
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}

/**
* Compatible with version 4.9.4
*
* @param adminClient
* @param topic
* @return
* @throws RemotingException
* @throws InterruptedException
* @throws MQClientException
* @throws MQBrokerException
*/
private static TopicStatsTable overrideExamineTopicStats(DefaultMQAdminExt adminClient,
String topic) throws RemotingException, InterruptedException, MQClientException, MQBrokerException {
TopicRouteData topicRouteData = adminClient.examineTopicRouteInfo(topic);
TopicStatsTable topicStatsTable = new TopicStatsTable();
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
String addr = bd.selectBrokerAddr();
if (addr != null) {
TopicStatsTable tst = adminClient
.getDefaultMQAdminExtImpl()
.getMqClientInstance()
.getMQClientAPIImpl()
.getTopicStatsInfo(addr, topic, 5000);
topicStatsTable.getOffsetTable().putAll(tst.getOffsetTable());
}
}
return topicStatsTable;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.admin.TopicOffset;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.connect.kafka.connect.adaptor.task.AbstractKafkaConnectSource;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.protocol.admin.TopicOffset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
2 changes: 1 addition & 1 deletion connectors/rocketmq-connect-deltalake/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-openmessaging</artifactId>
<version>4.3.2</version>
<version>4.9.4</version>
</dependency>

<dependency>
Expand Down
2 changes: 1 addition & 1 deletion connectors/rocketmq-connect-hudi/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-hudi-connector-name}/
* **spark-submit 启动任务**
将connect-runtime打包后通过spark-submit提交任务
```
nohup sh spark-submit --class org.apache.rocketmq.connect.runtime.DistributedConnectStartup --conf "spark.driver.extraJavaOptions=-Dlogback.configurationFile=logback.xml" --files /xxx/conf/connect.conf,/xxx/conf/log4j.properties --packages org.apache.hudi:hudi-spark3-bundle_2.12:0.8.0,org.apache.spark:spark-avro_2.12:3.0.1,org.apache.hudi:hudi-java-client:0.8.0,org.apache.parquet:parquet-avro:1.10.1,org.apache.avro:avro:1.10.2,com.alibaba:fastjson:1.2.51,org.reflections:reflections:0.9.11,org.apache.httpcomponents:httpclient:4.5.5,io.openmessaging:openmessaging-connector:0.1.1,commons-cli:commons-cli:1.1,org.apache.rocketmq:rocketmq-client:4.4.0,org.apache.rocketmq:rocketmq-tools:4.4.0,org.apache.rocketmq:rocketmq-remoting:4.4.0,org.apache.rocketmq:rocketmq-openmessaging:4.3.2,org.slf4j:slf4j-api:1.7.7,com.google.guava:guava:20.0,org.apache.hadoop:hadoop-common:3.3.1,org.reflections:reflections:0.9.12,org.apache.hive:hive-exec:2.3.7 --conf 'spark.executor.userClassPathFirst=true' --conf 'spark.driver.userClassPathFirst=true' --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' /xxx/rocketmq/rocketmq-connect-runtime-0.0.1-SNAPSHOT.jar &
nohup sh spark-submit --class org.apache.rocketmq.connect.runtime.DistributedConnectStartup --conf "spark.driver.extraJavaOptions=-Dlogback.configurationFile=logback.xml" --files /xxx/conf/connect.conf,/xxx/conf/log4j.properties --packages org.apache.hudi:hudi-spark3-bundle_2.12:0.8.0,org.apache.spark:spark-avro_2.12:3.0.1,org.apache.hudi:hudi-java-client:0.8.0,org.apache.parquet:parquet-avro:1.10.1,org.apache.avro:avro:1.10.2,com.alibaba:fastjson:1.2.51,org.reflections:reflections:0.9.11,org.apache.httpcomponents:httpclient:4.5.5,io.openmessaging:openmessaging-connector:0.1.1,commons-cli:commons-cli:1.1,org.apache.rocketmq:rocketmq-client:4.4.0,org.apache.rocketmq:rocketmq-tools:4.4.0,org.apache.rocketmq:rocketmq-remoting:4.4.0,org.apache.rocketmq:rocketmq-openmessaging:4.9.4,org.slf4j:slf4j-api:1.7.7,com.google.guava:guava:20.0,org.apache.hadoop:hadoop-common:3.3.1,org.reflections:reflections:0.9.12,org.apache.hive:hive-exec:2.3.7 --conf 'spark.executor.userClassPathFirst=true' --conf 'spark.driver.userClassPathFirst=true' --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' /xxx/rocketmq/rocketmq-connect-runtime-0.0.1-SNAPSHOT.jar &
```
后续操作参考rocketmq-connect-hudi启动步骤

Expand Down
4 changes: 2 additions & 2 deletions connectors/rocketmq-connect-hudi/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<!-- Compiler settings properties -->
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<rocketmq.version>4.7.1</rocketmq.version>
<rocketmq.version>4.9.4</rocketmq.version>

<hudi.version>0.8.0</hudi.version>
<avro.version>1.10.2</avro.version>
Expand Down Expand Up @@ -231,7 +231,7 @@
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-openmessaging</artifactId>
<version>4.3.2</version>
<version>4.9.4</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
Expand Down
2 changes: 1 addition & 1 deletion connectors/rocketmq-connect-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
<!-- Compiler settings properties -->
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<rocketmq.version>4.7.1</rocketmq.version>
<rocketmq.version>4.9.4</rocketmq.version>

<!--test jar-->
<junit.version>4.13.1</junit.version>
Expand Down
2 changes: 1 addition & 1 deletion connectors/rocketmq-connect-jms/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-openmessaging</artifactId>
<version>4.3.2</version>
<version>4.9.4</version>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
Expand Down
2 changes: 1 addition & 1 deletion connectors/rocketmq-connect-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-openmessaging</artifactId>
<version>4.3.2</version>
<version>4.9.4</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
Expand Down
2 changes: 1 addition & 1 deletion connectors/rocketmq-connect-mongo/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-openmessaging</artifactId>
<version>4.3.2</version>
<version>4.9.4</version>
</dependency>
<dependency>
<groupId>junit</groupId>
Expand Down
2 changes: 1 addition & 1 deletion connectors/rocketmq-connect-mqtt/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
<!-- Compiler settings properties -->
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<rocketmq.version>4.7.1</rocketmq.version>
<rocketmq.version>4.9.4</rocketmq.version>
</properties>

<build>
Expand Down
2 changes: 1 addition & 1 deletion connectors/rocketmq-connect-rabbitmq/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-openmessaging</artifactId>
<version>4.3.2</version>
<version>4.9.4</version>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
Expand Down
2 changes: 1 addition & 1 deletion connectors/rocketmq-replicator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
</build>

<properties>
<rocketmq.version>4.7.1</rocketmq.version>
<rocketmq.version>4.9.4</rocketmq.version>
<openmessaging.connector.version>0.1.3</openmessaging.connector.version>
<junit.version>4.13.1</junit.version>
<mockito.version>3.2.4</mockito.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,25 @@

package org.apache.rocketmq.connect.metrics.reporter;

import com.beust.jcommander.internal.Sets;
import com.google.common.collect.Sets;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;

import java.util.HashMap;
import java.util.Set;
import java.util.UUID;


/**
* rocket connect util
Expand All @@ -59,7 +58,7 @@ public static DefaultMQProducer initDefaultMQProducer(boolean aclEnabled,
String namesrvAddr) {
RPCHook rpcHook = null;
if (aclEnabled) {
rpcHook = new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
rpcHook = getAclRPCHook(accessKey, secretKey);
}
DefaultMQProducer producer = new DefaultMQProducer(rpcHook);
producer.setNamesrvAddr(namesrvAddr);
Expand Down Expand Up @@ -94,7 +93,7 @@ public static void createTopic(DefaultMQAdminExt defaultMQAdminExt,
TopicConfig topicConfig) {
try {
ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
HashMap<String, Set<String>> clusterAddrTable = clusterInfo.getClusterAddrTable();
Map<String, Set<String>> clusterAddrTable = clusterInfo.getClusterAddrTable();
Set<String> clusterNameSet = clusterAddrTable.keySet();
for (String clusterName : clusterNameSet) {
Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
Expand Down Expand Up @@ -140,7 +139,7 @@ public static String createSubGroup(DefaultMQAdminExt defaultMQAdminExt, String
SubscriptionGroupConfig initConfig = new SubscriptionGroupConfig();
initConfig.setGroupName(subGroup);
ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
HashMap<String, Set<String>> clusterAddrTable = clusterInfo.getClusterAddrTable();
Map<String, Set<String>> clusterAddrTable = clusterInfo.getClusterAddrTable();
Set<String> clusterNameSet = clusterAddrTable.keySet();
for (String clusterName : clusterNameSet) {
Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@
*/
package org.apache.rocketmq.connect.metrics.reporter;


import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.SortedMap;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.Message;
Expand All @@ -28,11 +31,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.SortedMap;
import java.util.concurrent.TimeUnit;

/**
* rocketmq exporter
*/
Expand Down Expand Up @@ -160,7 +158,9 @@ private void send(MetricName name, Double value) {
message.setTopic(this.topic);
message.setKeys(name.getStr());
message.setBody(value.toString().getBytes(StandardCharsets.UTF_8));
producer.send(message);
if (producer != null) {
producer.send(message);
}
} catch (Exception e) {
log.error("Send metrics error", e);
}
Expand All @@ -169,6 +169,8 @@ private void send(MetricName name, Double value) {
@Override
public void close() {
super.close();
producer.shutdown();
if (producer != null) {
producer.shutdown();
}
}
}
Loading

0 comments on commit 47f0aac

Please sign in to comment.