Skip to content

Commit

Permalink
Adding the COMPACTION attribute is supported when creating a topic in…
Browse files Browse the repository at this point in the history
… the BrokerBasedLog #436
  • Loading branch information
sunxiaojian committed Mar 3, 2023
1 parent 6ffc39e commit eebbf66
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package org.apache.rocketmq.connect.metrics.reporter;

import com.beust.jcommander.internal.Sets;
import com.google.common.collect.Sets;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.exception.MQClientException;
Expand All @@ -33,10 +36,6 @@
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;

import java.util.HashMap;
import java.util.Set;
import java.util.UUID;


/**
* rocket connect util
Expand Down Expand Up @@ -94,7 +93,7 @@ public static void createTopic(DefaultMQAdminExt defaultMQAdminExt,
TopicConfig topicConfig) {
try {
ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
HashMap<String, Set<String>> clusterAddrTable = clusterInfo.getClusterAddrTable();
Map<String, Set<String>> clusterAddrTable = clusterInfo.getClusterAddrTable();
Set<String> clusterNameSet = clusterAddrTable.keySet();
for (String clusterName : clusterNameSet) {
Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
Expand Down Expand Up @@ -140,7 +139,7 @@ public static String createSubGroup(DefaultMQAdminExt defaultMQAdminExt, String
SubscriptionGroupConfig initConfig = new SubscriptionGroupConfig();
initConfig.setGroupName(subGroup);
ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
HashMap<String, Set<String>> clusterAddrTable = clusterInfo.getClusterAddrTable();
Map<String, Set<String>> clusterAddrTable = clusterInfo.getClusterAddrTable();
Set<String> clusterNameSet = clusterAddrTable.keySet();
for (String clusterName : clusterNameSet) {
Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@
*/
package org.apache.rocketmq.connect.metrics.reporter;


import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.SortedMap;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.Message;
Expand All @@ -28,11 +31,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.SortedMap;
import java.util.concurrent.TimeUnit;

/**
* rocketmq exporter
*/
Expand Down Expand Up @@ -160,7 +158,9 @@ private void send(MetricName name, Double value) {
message.setTopic(this.topic);
message.setKeys(name.getStr());
message.setBody(value.toString().getBytes(StandardCharsets.UTF_8));
producer.send(message);
if (producer != null) {
producer.send(message);
}
} catch (Exception e) {
log.error("Send metrics error", e);
}
Expand All @@ -169,6 +169,8 @@ private void send(MetricName name, Double value) {
@Override
public void close() {
super.close();
producer.shutdown();
if (producer != null) {
producer.shutdown();
}
}
}
3 changes: 1 addition & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
</license>
</licenses>
<properties>
<rocketmq.version>4.7.1</rocketmq.version>
<junit.version>4.13.1</junit.version>
<assertj.version>3.22.0</assertj.version>
<mockito.version>4.7.0</mockito.version>
Expand All @@ -57,7 +56,7 @@
<jackson.version>2.13.4.1</jackson.version>
<commons-collections4.version>4.4</commons-collections4.version>
<!-- RocketMQ Version-->
<rocketmq.version>4.7.1</rocketmq.version>
<rocketmq.version>5.0.0</rocketmq.version>
<maven-artifact.version>3.8.1</maven-artifact.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<!--maven properties -->
Expand Down
3 changes: 0 additions & 3 deletions rocketmq-connect-runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>

<!-- RocketMQ Version-->
<rocketmq.version>4.7.1</rocketmq.version>

</properties>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,7 @@ public void initialize(WorkerConfig config, RecordConverter converter) {
this.converter.configure(new HashMap<>());
this.statusTopic = config.getConnectStatusTopic();
this.dataSynchronizer = initializationDataSynchronizer(config);

new BrokerBasedLog(config,
this.statusTopic,
ConnectUtil.createGroupName(statusManagePrefix, config.getWorkerId()),
new StatusChangeCallback(),
Serdes.serdeFrom(String.class),
Serdes.serdeFrom(byte[].class),
enabledCompactTopic()
);

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package org.apache.rocketmq.connect.runtime.utils;

import com.beust.jcommander.internal.Sets;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.openmessaging.connector.api.data.RecordOffset;
import io.openmessaging.connector.api.data.RecordPartition;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -177,7 +177,7 @@ public static void createTopic(WorkerConfig connectConfig, TopicConfig topicConf
try {
defaultMQAdminExt = startMQAdminTool(connectConfig);
ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
HashMap<String, Set<String>> clusterAddrTable = clusterInfo.getClusterAddrTable();
Map<String, Set<String>> clusterAddrTable = clusterInfo.getClusterAddrTable();
Set<String> clusterNameSet = clusterAddrTable.keySet();
for (String clusterName : clusterNameSet) {
Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
Expand Down Expand Up @@ -248,7 +248,7 @@ public static String createSubGroup(WorkerConfig connectConfig, String subGroup)
SubscriptionGroupConfig initConfig = new SubscriptionGroupConfig();
initConfig.setGroupName(subGroup);
ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
HashMap<String, Set<String>> clusterAddrTable = clusterInfo.getClusterAddrTable();
Map<String, Set<String>> clusterAddrTable = clusterInfo.getClusterAddrTable();
Set<String> clusterNameSet = clusterAddrTable.keySet();
for (String clusterName : clusterNameSet) {
Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.rocketmq.connect.runtime.utils.datasync;

import com.google.common.collect.Maps;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -137,6 +138,11 @@ private void prepare() {
if (!ConnectUtil.isTopicExist(workerConfig, topicName)) {
log.info("Try to create store topic: {}!", topicName);
TopicConfig topicConfig = new TopicConfig(topicName, 1, 1, PermName.PERM_READ | PermName.PERM_WRITE);
if (enabledCompactTopic) {
Map<String, String> attributes = Maps.newConcurrentMap();
attributes.put("+cleanup.policy", "COMPACTION");
topicConfig.setAttributes(attributes);
}
ConnectUtil.createTopic(workerConfig, topicConfig);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,20 @@

package org.apache.rocketmq.connect.runtime.connectorwrapper;

import com.google.common.collect.Sets;
import io.openmessaging.connector.api.component.connector.ConnectorContext;
import io.openmessaging.connector.api.data.ConnectRecord;
import io.openmessaging.internal.DefaultKeyValue;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
import org.apache.rocketmq.connect.runtime.config.ConnectorConfig;
Expand All @@ -45,28 +56,21 @@
import org.apache.rocketmq.connect.runtime.service.local.LocalStateManagementServiceImpl;
import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId;
import org.apache.rocketmq.connect.runtime.utils.TestUtils;
import org.apache.rocketmq.connect.runtime.utils.datasync.DataSynchronizer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.junit.MockitoJUnitRunner;

import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.when;

@RunWith(MockitoJUnitRunner.class)
Expand All @@ -75,9 +79,6 @@ public class WorkerTest {
@Mock
private PositionManagementService positionManagementService;

@Mock
private PositionManagementService offsetManagementService;

@Mock
private ConfigManagementService configManagementService;

Expand Down Expand Up @@ -111,12 +112,18 @@ public class WorkerTest {

private WrapperStatusListener wrapperStatusListener;

@Mock
private StateManagementService stateManagementService;

private ServerResponseMocker nameServerMocker;

private ServerResponseMocker brokerMocker;

@Mock
protected DataSynchronizer dataSynchronizer;

private final MockedStatic<ConnectUtil> connectUtil = mockStatic(ConnectUtil.class);

@Before
public void init() {
nameServerMocker = NameServerMocker.startByDefaultConf(9876, 10911);
Expand All @@ -125,6 +132,7 @@ public void init() {
when(plugin.newConnector(any())).thenReturn(new TestConnector());
when(plugin.delegatingLoader()).thenReturn(delegatingClassLoader);
when(delegatingClassLoader.pluginClassLoader(any())).thenReturn(pluginClassLoader);
connectUtil.when(() -> ConnectUtil.fetchAllConsumerGroupList(connectConfig)).thenReturn(Sets.newHashSet());
Thread.currentThread().setContextClassLoader(pluginClassLoader);

connectConfig = new WorkerConfig();
Expand Down Expand Up @@ -179,6 +187,7 @@ public void init() {

@After
public void destroy() throws InterruptedException {
connectUtil.close();
TimeUnit.SECONDS.sleep(2);
worker.stop();
TestUtils.deleteFile(new File(System.getProperty("user.home") + File.separator + "testConnectorStore"));
Expand Down

0 comments on commit eebbf66

Please sign in to comment.