From c82fad8f19dbd9bf4e136d9fd44b7866ba2397e7 Mon Sep 17 00:00:00 2001 From: zhangjidi Date: Thu, 12 May 2022 09:07:19 +0800 Subject: [PATCH] [ISSUE #132]Define filtering rules to synchronize only filtered messages --- .../rocketmq/replicator/RmqSourceReplicator.java | 3 ++- .../apache/rocketmq/replicator/RmqSourceTask.java | 7 ++++++- .../rocketmq/replicator/config/ConfigDefine.java | 2 ++ .../replicator/config/RmqConnectorConfig.java | 6 ++++++ .../rocketmq/replicator/config/TaskConfig.java | 9 +++++++++ .../rocketmq/replicator/config/TaskConfigEnum.java | 3 ++- .../replicator/config/TaskDivideConfig.java | 13 ++++++++++++- .../strategy/DivideTaskByConsistentHash.java | 1 + .../replicator/strategy/DivideTaskByQueue.java | 1 + .../replicator/strategy/DivideTaskByTopic.java | 1 + .../replicator/DefaultTaskDivideStrategyTest.java | 7 +++++-- 11 files changed, 47 insertions(+), 6 deletions(-) diff --git a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java index e4966d17..cd709758 100644 --- a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java +++ b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java @@ -214,7 +214,8 @@ public List taskConfigs(int maxTasks) { DataType.COMMON_MESSAGE.ordinal(), this.replicatorConfig.isSrcAclEnable(), this.replicatorConfig.getSrcAccessKey(), - this.replicatorConfig.getSrcSecretKey() + this.replicatorConfig.getSrcSecretKey(), + this.replicatorConfig.getFilterRule() ); return this.replicatorConfig.getTaskDivideStrategy().divide(this.topicRouteMap, tdc, maxTasks); } diff --git a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java index 73cebfd5..7a7669bf 100644 --- a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java +++ b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java @@ -34,6 +34,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; @@ -56,6 +57,8 @@ public class RmqSourceTask extends SourceTask { private static final Logger log = LoggerFactory.getLogger(RmqSourceTask.class); + private static final String NULL_STR = "null"; + private final String taskId; private final TaskConfig config; private DefaultMQPullConsumer consumer; @@ -162,7 +165,9 @@ private List pollCommonMessage() { if (started) { try { for (TaskTopicInfo taskTopicConfig : this.mqOffsetMap.keySet()) { - PullResult pullResult = consumer.pull(taskTopicConfig, "*", + String subExpression = (StringUtils.isEmpty(this.config.getFilterRule()) || NULL_STR.equals(this.config.getFilterRule())) + ? "*" : this.config.getFilterRule(); + PullResult pullResult = consumer.pull(taskTopicConfig, subExpression, this.mqOffsetMap.get(taskTopicConfig), 32); switch (pullResult.getPullStatus()) { case FOUND: { diff --git a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ConfigDefine.java b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ConfigDefine.java index fea2cfb3..91cd5406 100644 --- a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ConfigDefine.java +++ b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/ConfigDefine.java @@ -51,6 +51,8 @@ public class ConfigDefine { public static final String OFFSET_SYNC_TOPIC = "offset.sync.topic"; + public static final String FILTER_RULE = "filter-rule"; + /** * The required key for all configurations. */ diff --git a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/RmqConnectorConfig.java b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/RmqConnectorConfig.java index 40edee02..976b7ba9 100644 --- a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/RmqConnectorConfig.java +++ b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/RmqConnectorConfig.java @@ -42,6 +42,7 @@ public class RmqConnectorConfig { private boolean targetAclEnable = false; private String targetAccessKey; private String targetSecretKey; + private String filterRule; public RmqConnectorConfig() { } @@ -67,6 +68,7 @@ public void init(KeyValue config) { refreshInterval = config.getLong(ConfigDefine.REFRESH_INTERVAL, 3); renamePattern = config.getString(ConfigDefine.CONN_TOPIC_RENAME_FMT); offsetSyncTopic = config.getString(ConfigDefine.OFFSET_SYNC_TOPIC); + filterRule = config.getString(ConfigDefine.FILTER_RULE); if (config.containsKey(ConfigDefine.CONN_SOURCE_ACL_ENABLE)) { srcAclEnable = Boolean.parseBoolean(config.getString(ConfigDefine.CONN_SOURCE_ACL_ENABLE)); @@ -162,4 +164,8 @@ public String getTargetAccessKey() { public String getTargetSecretKey() { return targetSecretKey; } + + public String getFilterRule() { + return filterRule; + } } diff --git a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/TaskConfig.java b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/TaskConfig.java index 79215851..c92ec06e 100644 --- a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/TaskConfig.java +++ b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/TaskConfig.java @@ -30,6 +30,7 @@ public class TaskConfig { private boolean srcAclEnable = false; private String srcAccessKey; private String srcSecretKey; + private String filterRule; public String getSourceGroup() { return sourceGroup; @@ -130,4 +131,12 @@ public String getSrcSecretKey() { public void setSrcSecretKey(String srcSecretKey) { this.srcSecretKey = srcSecretKey; } + + public String getFilterRule() { + return filterRule; + } + + public void setFilterRule(String filterRule) { + this.filterRule = filterRule; + } } diff --git a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/TaskConfigEnum.java b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/TaskConfigEnum.java index 520c31f1..17e661f7 100644 --- a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/TaskConfigEnum.java +++ b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/TaskConfigEnum.java @@ -34,7 +34,8 @@ public enum TaskConfigEnum { TASK_SOURCE_RECORD_CONVERTER("source-record-converter"), TASK_SOURCE_ACL_ENABLE("srcAclEnable"), TASK_SOURCE_ACCESS_KEY("srcAccessKey"), - TASK_SOURCE_SECRET_KEY("srcSecretKey"); + TASK_SOURCE_SECRET_KEY("srcSecretKey"), + TASK_FILTER_RULE("filterRule"); private String key; diff --git a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/TaskDivideConfig.java b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/TaskDivideConfig.java index 16a74ed5..d6bc0b16 100644 --- a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/TaskDivideConfig.java +++ b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/config/TaskDivideConfig.java @@ -34,8 +34,10 @@ public class TaskDivideConfig { private String srcSecretKey; + private String filterRule; + public TaskDivideConfig(String sourceNamesrvAddr, String srcCluster, String storeTopic, String srcRecordConverter, - int dataType, boolean srcAclEnable, String srcAccessKey, String srcSecretKey) { + int dataType, boolean srcAclEnable, String srcAccessKey, String srcSecretKey, String filterRule) { this.sourceNamesrvAddr = sourceNamesrvAddr; this.srcCluster = srcCluster; this.storeTopic = storeTopic; @@ -44,6 +46,7 @@ public TaskDivideConfig(String sourceNamesrvAddr, String srcCluster, String stor this.srcAclEnable = srcAclEnable; this.srcAccessKey = srcAccessKey; this.srcSecretKey = srcSecretKey; + this.filterRule = filterRule; } public String getSourceNamesrvAddr() { @@ -109,4 +112,12 @@ public String getSrcSecretKey() { public void setSrcSecretKey(String srcSecretKey) { this.srcSecretKey = srcSecretKey; } + + public String getFilterRule() { + return filterRule; + } + + public void setFilterRule(String filterRule) { + this.filterRule = filterRule; + } } diff --git a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByConsistentHash.java b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByConsistentHash.java index b5529aa7..4ce1aaed 100644 --- a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByConsistentHash.java +++ b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByConsistentHash.java @@ -67,6 +67,7 @@ public List divide(Map> topicMap, TaskDivid keyValue.put(TaskConfigEnum.TASK_SOURCE_ACL_ENABLE.getKey(), String.valueOf(tdc.isSrcAclEnable())); keyValue.put(TaskConfigEnum.TASK_SOURCE_ACCESS_KEY.getKey(), tdc.getSrcAccessKey()); keyValue.put(TaskConfigEnum.TASK_SOURCE_SECRET_KEY.getKey(), tdc.getSrcSecretKey()); + keyValue.put(TaskConfigEnum.TASK_FILTER_RULE.getKey(), tdc.getFilterRule()); config.add(keyValue); } diff --git a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByQueue.java b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByQueue.java index db2a03b8..5004df26 100644 --- a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByQueue.java +++ b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByQueue.java @@ -62,6 +62,7 @@ public List divide(Map> topicRouteMap, Task keyValue.put(TaskConfigEnum.TASK_SOURCE_ACL_ENABLE.getKey(), String.valueOf(tdc.isSrcAclEnable())); keyValue.put(TaskConfigEnum.TASK_SOURCE_ACCESS_KEY.getKey(), tdc.getSrcAccessKey()); keyValue.put(TaskConfigEnum.TASK_SOURCE_SECRET_KEY.getKey(), tdc.getSrcSecretKey()); + keyValue.put(TaskConfigEnum.TASK_FILTER_RULE.getKey(), tdc.getFilterRule()); config.add(keyValue); } diff --git a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByTopic.java b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByTopic.java index 010cb904..5fd0a3a3 100644 --- a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByTopic.java +++ b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByTopic.java @@ -54,6 +54,7 @@ public List divide(Map> topicRouteMap, Task keyValue.put(TaskConfigEnum.TASK_SOURCE_ACL_ENABLE.getKey(), String.valueOf(tdc.isSrcAclEnable())); keyValue.put(TaskConfigEnum.TASK_SOURCE_ACCESS_KEY.getKey(), tdc.getSrcAccessKey()); keyValue.put(TaskConfigEnum.TASK_SOURCE_SECRET_KEY.getKey(), tdc.getSrcSecretKey()); + keyValue.put(TaskConfigEnum.TASK_FILTER_RULE.getKey(), tdc.getFilterRule()); config.add(keyValue); } diff --git a/connectors/rocketmq-replicator/src/test/java/org/apache/rocketmq/replicator/DefaultTaskDivideStrategyTest.java b/connectors/rocketmq-replicator/src/test/java/org/apache/rocketmq/replicator/DefaultTaskDivideStrategyTest.java index 60dfe0ed..556b8910 100644 --- a/connectors/rocketmq-replicator/src/test/java/org/apache/rocketmq/replicator/DefaultTaskDivideStrategyTest.java +++ b/connectors/rocketmq-replicator/src/test/java/org/apache/rocketmq/replicator/DefaultTaskDivideStrategyTest.java @@ -78,7 +78,8 @@ public void testDivideTaskByTopic() { DataType.COMMON_MESSAGE.ordinal(), false, "", - "" + "", + "TagA" ); List taskConfigs = config.getTaskDivideStrategy().divide(topicRouteMap, tdc, 4); assertThat(taskConfigs.size()).isEqualTo(4); @@ -125,7 +126,8 @@ public void testDivideTaskByQueue() { DataType.COMMON_MESSAGE.ordinal(), false, "", - "" + "", + "TagA||TagB" ); List taskConfigs = config.getTaskDivideStrategy().divide(topicRouteMap, tdc, 4); assertThat(taskConfigs.size()).isEqualTo(4); @@ -184,6 +186,7 @@ public void testDivideTaskByHash() { DataType.COMMON_MESSAGE.ordinal(), false, "", + "", "" ); DivideTaskByConsistentHash hash = new DivideTaskByConsistentHash();