diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/SinkConnectorConfig.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/SinkConnectorConfig.java index f533093d..fa7e5b1f 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/SinkConnectorConfig.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/SinkConnectorConfig.java @@ -19,11 +19,10 @@ package org.apache.rocketmq.connect.runtime.config; import com.google.common.base.Splitter; -import java.util.HashSet; +import java.util.HashMap; import java.util.List; -import java.util.Set; +import java.util.Map; import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue; @@ -31,13 +30,19 @@ public class SinkConnectorConfig extends ConnectConfig { - public static Set parseTopicList(ConnectKeyValue taskConfig) { - String messageQueueStr = taskConfig.getString(RuntimeConfigDefine.CONNECT_TOPICNAME); - if (StringUtils.isBlank(messageQueueStr)) { - return null; + public static Map parseTopicList(ConnectKeyValue taskConfig) { + String topicNameAndTagss = taskConfig.getString(RuntimeConfigDefine.CONNECT_TOPICNAME); + List topicTagList = Splitter.on(COMMA).omitEmptyStrings().trimResults().splitToList(topicNameAndTagss); + Map topicNameAndTagssMap = new HashMap<>(8); + for (String topicTagPair : topicTagList) { + List topicAndTag = Splitter.on(SEMICOLON).omitEmptyStrings().trimResults().splitToList(topicTagPair); + if (topicAndTag.size() == 1) { + topicNameAndTagssMap.put(topicAndTag.get(0), "*"); + } else { + topicNameAndTagssMap.put(topicAndTag.get(0), topicAndTag.get(1)); + } } - List topicList = Splitter.on(SEMICOLON).omitEmptyStrings().trimResults().splitToList(messageQueueStr); - return new HashSet<>(topicList); + return topicNameAndTagssMap; } public static MessageQueue parseMessageQueueList(String messageQueueStr) { diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java index 8f95747a..0159ad1a 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java @@ -153,6 +153,8 @@ public class WorkerSinkTask implements WorkerTask { private WorkerSinkTaskContext sinkTaskContext; + private Map topicTagMap; + private final TransformChain transformChain; public static final String BROKER_NAME = "brokerName"; @@ -273,7 +275,8 @@ private void setQueueOffset() { } private void registTopics() { - Set topics = SinkConnectorConfig.parseTopicList(taskConfig); + topicTagMap = SinkConnectorConfig.parseTopicList(taskConfig); + Set topics = topicTagMap.keySet(); if (org.apache.commons.collections4.CollectionUtils.isEmpty(topics)) { throw new ConnectException("sink connector topics config can be null, please check sink connector config info"); } @@ -372,7 +375,7 @@ private void pullMessageFromQueues() throws InterruptedException { final long beginPullMsgTimestamp = System.currentTimeMillis(); try { shouldStopPullMsg(); - pullResult = consumer.pullBlockIfNotFound(entry.getKey(), "*", entry.getValue(), MAX_MESSAGE_NUM); + pullResult = consumer.pullBlockIfNotFound(entry.getKey(), topicTagMap.get(entry.getKey().getTopic()), entry.getValue(), MAX_MESSAGE_NUM); pullMsgErrorCount = 0; } catch (MQClientException e) { pullMsgErrorCount++;