From 040613f4c412b212fbf13e8957523fa3b1008053 Mon Sep 17 00:00:00 2001 From: sunxiaojian Date: Thu, 2 Mar 2023 11:05:38 +0800 Subject: [PATCH] Adding the COMPACTION attribute is supported when creating a topic in the BrokerBasedLog #436 --- .../metrics/reporter/RocketMQClientUtil.java | 13 +++---- .../reporter/RocketMQScheduledReporter.java | 18 +++++---- pom.xml | 3 +- rocketmq-connect-runtime/pom.xml | 3 -- .../AbstractStateManagementService.java | 30 +++++---------- .../connect/runtime/utils/ConnectUtil.java | 6 +-- .../utils/datasync/BrokerBasedLog.java | 6 +++ .../runtime/connectorwrapper/WorkerTest.java | 37 ++++++++++++------- 8 files changed, 58 insertions(+), 58 deletions(-) diff --git a/metric-exporter/src/main/java/org/apache/rocketmq/connect/metrics/reporter/RocketMQClientUtil.java b/metric-exporter/src/main/java/org/apache/rocketmq/connect/metrics/reporter/RocketMQClientUtil.java index e3587d693..4238bb8d4 100644 --- a/metric-exporter/src/main/java/org/apache/rocketmq/connect/metrics/reporter/RocketMQClientUtil.java +++ b/metric-exporter/src/main/java/org/apache/rocketmq/connect/metrics/reporter/RocketMQClientUtil.java @@ -17,7 +17,10 @@ package org.apache.rocketmq.connect.metrics.reporter; -import com.beust.jcommander.internal.Sets; +import com.google.common.collect.Sets; +import java.util.Map; +import java.util.Set; +import java.util.UUID; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.exception.MQClientException; @@ -33,10 +36,6 @@ import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.command.CommandUtil; -import java.util.HashMap; -import java.util.Set; -import java.util.UUID; - /** * rocket connect util @@ -94,7 +93,7 @@ public static void createTopic(DefaultMQAdminExt defaultMQAdminExt, TopicConfig topicConfig) { try { ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo(); - HashMap> clusterAddrTable = clusterInfo.getClusterAddrTable(); + Map> clusterAddrTable = clusterInfo.getClusterAddrTable(); Set clusterNameSet = clusterAddrTable.keySet(); for (String clusterName : clusterNameSet) { Set masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName); @@ -140,7 +139,7 @@ public static String createSubGroup(DefaultMQAdminExt defaultMQAdminExt, String SubscriptionGroupConfig initConfig = new SubscriptionGroupConfig(); initConfig.setGroupName(subGroup); ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo(); - HashMap> clusterAddrTable = clusterInfo.getClusterAddrTable(); + Map> clusterAddrTable = clusterInfo.getClusterAddrTable(); Set clusterNameSet = clusterAddrTable.keySet(); for (String clusterName : clusterNameSet) { Set masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName); diff --git a/metric-exporter/src/main/java/org/apache/rocketmq/connect/metrics/reporter/RocketMQScheduledReporter.java b/metric-exporter/src/main/java/org/apache/rocketmq/connect/metrics/reporter/RocketMQScheduledReporter.java index bc504c44e..29229d993 100644 --- a/metric-exporter/src/main/java/org/apache/rocketmq/connect/metrics/reporter/RocketMQScheduledReporter.java +++ b/metric-exporter/src/main/java/org/apache/rocketmq/connect/metrics/reporter/RocketMQScheduledReporter.java @@ -16,9 +16,12 @@ */ package org.apache.rocketmq.connect.metrics.reporter; - import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.SortedMap; +import java.util.concurrent.TimeUnit; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.message.Message; @@ -28,11 +31,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.charset.StandardCharsets; -import java.util.Map; -import java.util.SortedMap; -import java.util.concurrent.TimeUnit; - /** * rocketmq exporter */ @@ -160,7 +158,9 @@ private void send(MetricName name, Double value) { message.setTopic(this.topic); message.setKeys(name.getStr()); message.setBody(value.toString().getBytes(StandardCharsets.UTF_8)); - producer.send(message); + if (producer != null) { + producer.send(message); + } } catch (Exception e) { log.error("Send metrics error", e); } @@ -169,6 +169,8 @@ private void send(MetricName name, Double value) { @Override public void close() { super.close(); - producer.shutdown(); + if (producer != null) { + producer.shutdown(); + } } } diff --git a/pom.xml b/pom.xml index 2429e8b1d..ecc65034d 100644 --- a/pom.xml +++ b/pom.xml @@ -41,7 +41,6 @@ - 4.7.1 4.13.1 3.22.0 4.7.0 @@ -57,7 +56,7 @@ 2.13.4.1 4.4 - 4.7.1 + 5.0.0 3.8.1 UTF-8 diff --git a/rocketmq-connect-runtime/pom.xml b/rocketmq-connect-runtime/pom.xml index 82bd7a610..4ed44af40 100644 --- a/rocketmq-connect-runtime/pom.xml +++ b/rocketmq-connect-runtime/pom.xml @@ -42,9 +42,6 @@ 1.8 1.8 - - 4.7.1 - diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractStateManagementService.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractStateManagementService.java index 153e53791..edad9f188 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractStateManagementService.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/AbstractStateManagementService.java @@ -21,33 +21,29 @@ import io.openmessaging.connector.api.data.SchemaAndValue; import io.openmessaging.connector.api.data.SchemaBuilder; import io.openmessaging.connector.api.data.Struct; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; import org.apache.rocketmq.connect.runtime.common.ConnAndTaskStatus; import org.apache.rocketmq.connect.runtime.common.LoggerName; import org.apache.rocketmq.connect.runtime.config.WorkerConfig; import org.apache.rocketmq.connect.runtime.connectorwrapper.status.AbstractStatus; import org.apache.rocketmq.connect.runtime.connectorwrapper.status.ConnectorStatus; import org.apache.rocketmq.connect.runtime.connectorwrapper.status.TaskStatus; -import org.apache.rocketmq.connect.runtime.serialization.Serdes; import org.apache.rocketmq.connect.runtime.store.KeyValueStore; import org.apache.rocketmq.connect.runtime.utils.Callback; -import org.apache.rocketmq.connect.runtime.utils.ConnectUtil; import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId; import org.apache.rocketmq.connect.runtime.utils.Utils; -import org.apache.rocketmq.connect.runtime.utils.datasync.BrokerBasedLog; import org.apache.rocketmq.connect.runtime.utils.datasync.DataSynchronizer; import org.apache.rocketmq.connect.runtime.utils.datasync.DataSynchronizerCallback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - /** * State management service */ @@ -100,15 +96,7 @@ public void initialize(WorkerConfig config, RecordConverter converter) { this.converter.configure(new HashMap<>()); this.statusTopic = config.getConnectStatusTopic(); this.dataSynchronizer = initializationDataSynchronizer(config); - - new BrokerBasedLog(config, - this.statusTopic, - ConnectUtil.createGroupName(statusManagePrefix, config.getWorkerId()), - new StatusChangeCallback(), - Serdes.serdeFrom(String.class), - Serdes.serdeFrom(byte[].class), - enabledCompactTopic() - ); + } @Override diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java index d65849a36..3010a78e5 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java @@ -17,8 +17,8 @@ package org.apache.rocketmq.connect.runtime.utils; -import com.beust.jcommander.internal.Sets; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import io.openmessaging.connector.api.data.RecordOffset; import io.openmessaging.connector.api.data.RecordPartition; import org.apache.commons.lang3.StringUtils; @@ -177,7 +177,7 @@ public static void createTopic(WorkerConfig connectConfig, TopicConfig topicConf try { defaultMQAdminExt = startMQAdminTool(connectConfig); ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo(); - HashMap> clusterAddrTable = clusterInfo.getClusterAddrTable(); + Map> clusterAddrTable = clusterInfo.getClusterAddrTable(); Set clusterNameSet = clusterAddrTable.keySet(); for (String clusterName : clusterNameSet) { Set masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName); @@ -248,7 +248,7 @@ public static String createSubGroup(WorkerConfig connectConfig, String subGroup) SubscriptionGroupConfig initConfig = new SubscriptionGroupConfig(); initConfig.setGroupName(subGroup); ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo(); - HashMap> clusterAddrTable = clusterInfo.getClusterAddrTable(); + Map> clusterAddrTable = clusterInfo.getClusterAddrTable(); Set clusterNameSet = clusterAddrTable.keySet(); for (String clusterName : clusterNameSet) { Set masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName); diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/datasync/BrokerBasedLog.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/datasync/BrokerBasedLog.java index 258adeeb6..ce37d0786 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/datasync/BrokerBasedLog.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/datasync/BrokerBasedLog.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.connect.runtime.utils.datasync; +import com.google.common.collect.Maps; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -137,6 +138,11 @@ private void prepare() { if (!ConnectUtil.isTopicExist(workerConfig, topicName)) { log.info("Try to create store topic: {}!", topicName); TopicConfig topicConfig = new TopicConfig(topicName, 1, 1, PermName.PERM_READ | PermName.PERM_WRITE); + if (enabledCompactTopic) { + Map attributes = Maps.newConcurrentMap(); + attributes.put("+cleanup.policy", "COMPACTION"); + topicConfig.setAttributes(attributes); + } ConnectUtil.createTopic(workerConfig, topicConfig); } } diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java index 5570be210..01ac4735e 100644 --- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java +++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java @@ -17,9 +17,20 @@ package org.apache.rocketmq.connect.runtime.connectorwrapper; +import com.google.common.collect.Sets; import io.openmessaging.connector.api.component.connector.ConnectorContext; import io.openmessaging.connector.api.data.ConnectRecord; import io.openmessaging.internal.DefaultKeyValue; +import java.io.File; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue; import org.apache.rocketmq.connect.runtime.config.ConnectorConfig; @@ -45,28 +56,21 @@ import org.apache.rocketmq.connect.runtime.service.local.LocalStateManagementServiceImpl; import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager; import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService; +import org.apache.rocketmq.connect.runtime.utils.ConnectUtil; import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId; import org.apache.rocketmq.connect.runtime.utils.TestUtils; +import org.apache.rocketmq.connect.runtime.utils.datasync.DataSynchronizer; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; +import org.mockito.MockedStatic; import org.mockito.junit.MockitoJUnitRunner; -import java.io.File; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) @@ -75,9 +79,6 @@ public class WorkerTest { @Mock private PositionManagementService positionManagementService; - @Mock - private PositionManagementService offsetManagementService; - @Mock private ConfigManagementService configManagementService; @@ -111,12 +112,18 @@ public class WorkerTest { private WrapperStatusListener wrapperStatusListener; + @Mock private StateManagementService stateManagementService; private ServerResponseMocker nameServerMocker; private ServerResponseMocker brokerMocker; + @Mock + protected DataSynchronizer dataSynchronizer; + + private final MockedStatic connectUtil = mockStatic(ConnectUtil.class); + @Before public void init() { nameServerMocker = NameServerMocker.startByDefaultConf(9876, 10911); @@ -125,6 +132,7 @@ public void init() { when(plugin.newConnector(any())).thenReturn(new TestConnector()); when(plugin.delegatingLoader()).thenReturn(delegatingClassLoader); when(delegatingClassLoader.pluginClassLoader(any())).thenReturn(pluginClassLoader); + connectUtil.when(() -> ConnectUtil.fetchAllConsumerGroupList(connectConfig)).thenReturn(Sets.newHashSet()); Thread.currentThread().setContextClassLoader(pluginClassLoader); connectConfig = new WorkerConfig(); @@ -179,6 +187,7 @@ public void init() { @After public void destroy() throws InterruptedException { + connectUtil.close(); TimeUnit.SECONDS.sleep(2); worker.stop(); TestUtils.deleteFile(new File(System.getProperty("user.home") + File.separator + "testConnectorStore"));