From 47d47e56f46b3d66cf2e0feeb50a91b21c97a2de Mon Sep 17 00:00:00 2001 From: rongtong Date: Fri, 7 Apr 2023 15:51:53 +0800 Subject: [PATCH] [ISSUE #466] Make srcTopics and srcTopicTags to one parameter (#467) --- .../ReplicatorCheckpointConnector.java | 4 ++-- .../replicator/ReplicatorCheckpointTask.java | 4 ++-- .../config/ReplicatorConnectorConfig.java | 24 ++----------------- 3 files changed, 6 insertions(+), 26 deletions(-) diff --git a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointConnector.java b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointConnector.java index e866da60..3849f645 100644 --- a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointConnector.java +++ b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointConnector.java @@ -64,7 +64,7 @@ public List taskConfigs(int maxTasks) { keyValue.put(ReplicatorConnectorConfig.SRC_ACL_ENABLE, config.getString(ReplicatorConnectorConfig.SRC_ACL_ENABLE, "false")); keyValue.put(ReplicatorConnectorConfig.SRC_ACCESS_KEY, config.getString(ReplicatorConnectorConfig.SRC_ACCESS_KEY, "")); keyValue.put(ReplicatorConnectorConfig.SRC_SECRET_KEY, config.getString(ReplicatorConnectorConfig.SRC_SECRET_KEY, "")); - keyValue.put(ReplicatorConnectorConfig.SRC_TOPICS, config.getString(ReplicatorConnectorConfig.SRC_TOPICS)); + keyValue.put(ReplicatorConnectorConfig.SRC_TOPICTAGS, config.getString(ReplicatorConnectorConfig.SRC_TOPICTAGS)); keyValue.put(ReplicatorConnectorConfig.DEST_CLOUD, config.getString(ReplicatorConnectorConfig.DEST_CLOUD)); keyValue.put(ReplicatorConnectorConfig.DEST_REGION, config.getString(ReplicatorConnectorConfig.DEST_REGION)); keyValue.put(ReplicatorConnectorConfig.DEST_CLUSTER, config.getString(ReplicatorConnectorConfig.DEST_CLUSTER)); @@ -94,7 +94,7 @@ public Class taskClass() { add(ReplicatorConnectorConfig.SRC_REGION); add(ReplicatorConnectorConfig.SRC_CLUSTER); add(ReplicatorConnectorConfig.SRC_ENDPOINT); - add(ReplicatorConnectorConfig.SRC_TOPICS); + add(ReplicatorConnectorConfig.SRC_TOPICTAGS); add(ReplicatorConnectorConfig.DEST_CLOUD); add(ReplicatorConnectorConfig.DEST_REGION); add(ReplicatorConnectorConfig.DEST_CLUSTER); diff --git a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointTask.java b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointTask.java index 8eba84a0..6c9946d2 100644 --- a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointTask.java +++ b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointTask.java @@ -144,7 +144,7 @@ public List poll() throws InterruptedException { lastCheckPointTimestamp = System.currentTimeMillis(); return null; } - Set srcTopics = connectorConfig.getSrcTopics(connectorConfig.getSrcTopics()); + Set srcTopics = ReplicatorConnectorConfig.getSrcTopicTagMap(connectorConfig.getSrcInstanceId(), connectorConfig.getSrcTopicTags()).keySet(); try { String[] syncGidArr = syncGids.split(connectorConfig.GID_SPLITTER); for (String consumerGroup : syncGidArr) { @@ -258,7 +258,7 @@ private void fillConnectorConfig(KeyValue config) { connectorConfig.setSrcCluster(config.getString(connectorConfig.SRC_CLUSTER)); connectorConfig.setSrcInstanceId(config.getString(connectorConfig.SRC_INSTANCEID)); connectorConfig.setSrcEndpoint(config.getString(connectorConfig.SRC_ENDPOINT)); - connectorConfig.setSrcTopics(config.getString(connectorConfig.SRC_TOPICS)); + connectorConfig.setSrcTopicTags(config.getString(connectorConfig.getSrcTopicTags())); connectorConfig.setDestCloud(config.getString(connectorConfig.DEST_CLOUD)); connectorConfig.setDestRegion(config.getString(connectorConfig.DEST_REGION)); connectorConfig.setDestCluster(config.getString(connectorConfig.DEST_CLUSTER)); diff --git a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ReplicatorConnectorConfig.java b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ReplicatorConnectorConfig.java index 2713a165..2db84b69 100644 --- a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ReplicatorConnectorConfig.java +++ b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ReplicatorConnectorConfig.java @@ -43,7 +43,6 @@ public class ReplicatorConnectorConfig { private String srcCluster; private String srcInstanceId; private String srcTopicTags; // format topic-1,tag-a;topic-2,tag-b;topic-3,tag-c - private String srcTopics; // format topic-1;topic-2,tag-b;topic-3,tag-c private String srcEndpoint; private boolean srcAclEnable; private boolean autoCreateInnerConsumergroup; @@ -66,7 +65,7 @@ public class ReplicatorConnectorConfig { private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET; // consume from timestamp private long consumeFromTimestamp = System.currentTimeMillis(); - // sourcetask replicate to mq failover strategy + // source task replicate to mq failover strategy private FailoverStrategy failoverStrategy = FailoverStrategy.DISMISS; private boolean enableHeartbeat = true; private boolean enableCheckpoint = true; @@ -90,7 +89,7 @@ public class ReplicatorConnectorConfig { private int syncTps = 1000; private int maxTask = 2; // - private int heartbeatIntervalMs = 1 * 1000; + private int heartbeatIntervalMs = 1000; private int checkpointIntervalMs = 10 * 1000; private long commitOffsetIntervalMs = 10 * 1000; private String heartbeatTopic; @@ -120,8 +119,6 @@ public class ReplicatorConnectorConfig { public final static String SRC_CLUSTER = "src.cluster"; public final static String SRC_INSTANCEID = "src.instanceid"; public final static String SRC_TOPICTAGS = "src.topictags"; - - public final static String SRC_TOPICS = "src.topics"; public final static String SRC_ENDPOINT = "src.endpoint"; public final static String SRC_ACL_ENABLE = "src.acl.enable"; public final static String SRC_ACCESS_KEY = "src.access.key"; @@ -242,15 +239,6 @@ public static Map getSrcTopicTagMap(String srcInstanceId, String return topicTagMap; } - - public static Set getSrcTopics(String srcTopics) { - if (StringUtils.isEmpty(srcTopics) || StringUtils.isBlank(srcTopics)) { - return null; - } - List topicList = Splitter.on(TOPIC_SPLITTER).omitEmptyStrings().trimResults().splitToList(srcTopics); - return new HashSet(topicList); - } - public String getSrcTopicTags() { return srcTopicTags; } @@ -259,14 +247,6 @@ public void setSrcTopicTags(String srcTopicTags) { this.srcTopicTags = srcTopicTags; } - public String getSrcTopics() { - return srcTopics; - } - - public void setSrcTopics(String srcTopics) { - this.srcTopics = srcTopics; - } - public String getSrcEndpoint() { return srcEndpoint; }