Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #132]Define filtering rules to synchronize only filtered messages #136

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ public List<KeyValue> taskConfigs(int maxTasks) {
DataType.COMMON_MESSAGE.ordinal(),
this.replicatorConfig.isSrcAclEnable(),
this.replicatorConfig.getSrcAccessKey(),
this.replicatorConfig.getSrcSecretKey()
this.replicatorConfig.getSrcSecretKey(),
this.replicatorConfig.getFilterRule()
);
return this.replicatorConfig.getTaskDivideStrategy().divide(this.topicRouteMap, tdc, maxTasks);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
Expand All @@ -56,6 +57,8 @@ public class RmqSourceTask extends SourceTask {

private static final Logger log = LoggerFactory.getLogger(RmqSourceTask.class);

private static final String NULL_STR = "null";

private final String taskId;
private final TaskConfig config;
private DefaultMQPullConsumer consumer;
Expand Down Expand Up @@ -162,7 +165,9 @@ private List<ConnectRecord> pollCommonMessage() {
if (started) {
try {
for (TaskTopicInfo taskTopicConfig : this.mqOffsetMap.keySet()) {
PullResult pullResult = consumer.pull(taskTopicConfig, "*",
String subExpression = (StringUtils.isEmpty(this.config.getFilterRule()) || NULL_STR.equals(this.config.getFilterRule()))
? "*" : this.config.getFilterRule();
PullResult pullResult = consumer.pull(taskTopicConfig, subExpression,
this.mqOffsetMap.get(taskTopicConfig), 32);
switch (pullResult.getPullStatus()) {
case FOUND: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public class ConfigDefine {

public static final String OFFSET_SYNC_TOPIC = "offset.sync.topic";

public static final String FILTER_RULE = "filter-rule";

/**
* The required key for all configurations.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class RmqConnectorConfig {
private boolean targetAclEnable = false;
private String targetAccessKey;
private String targetSecretKey;
private String filterRule;

public RmqConnectorConfig() {
}
Expand All @@ -67,6 +68,7 @@ public void init(KeyValue config) {
refreshInterval = config.getLong(ConfigDefine.REFRESH_INTERVAL, 3);
renamePattern = config.getString(ConfigDefine.CONN_TOPIC_RENAME_FMT);
offsetSyncTopic = config.getString(ConfigDefine.OFFSET_SYNC_TOPIC);
filterRule = config.getString(ConfigDefine.FILTER_RULE);

if (config.containsKey(ConfigDefine.CONN_SOURCE_ACL_ENABLE)) {
srcAclEnable = Boolean.parseBoolean(config.getString(ConfigDefine.CONN_SOURCE_ACL_ENABLE));
Expand Down Expand Up @@ -162,4 +164,8 @@ public String getTargetAccessKey() {
public String getTargetSecretKey() {
return targetSecretKey;
}

public String getFilterRule() {
return filterRule;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class TaskConfig {
private boolean srcAclEnable = false;
private String srcAccessKey;
private String srcSecretKey;
private String filterRule;

public String getSourceGroup() {
return sourceGroup;
Expand Down Expand Up @@ -130,4 +131,12 @@ public String getSrcSecretKey() {
public void setSrcSecretKey(String srcSecretKey) {
this.srcSecretKey = srcSecretKey;
}

public String getFilterRule() {
return filterRule;
}

public void setFilterRule(String filterRule) {
this.filterRule = filterRule;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ public enum TaskConfigEnum {
TASK_SOURCE_RECORD_CONVERTER("source-record-converter"),
TASK_SOURCE_ACL_ENABLE("srcAclEnable"),
TASK_SOURCE_ACCESS_KEY("srcAccessKey"),
TASK_SOURCE_SECRET_KEY("srcSecretKey");
TASK_SOURCE_SECRET_KEY("srcSecretKey"),
TASK_FILTER_RULE("filterRule");

private String key;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ public class TaskDivideConfig {

private String srcSecretKey;

private String filterRule;

public TaskDivideConfig(String sourceNamesrvAddr, String srcCluster, String storeTopic, String srcRecordConverter,
int dataType, boolean srcAclEnable, String srcAccessKey, String srcSecretKey) {
int dataType, boolean srcAclEnable, String srcAccessKey, String srcSecretKey, String filterRule) {
this.sourceNamesrvAddr = sourceNamesrvAddr;
this.srcCluster = srcCluster;
this.storeTopic = storeTopic;
Expand All @@ -44,6 +46,7 @@ public TaskDivideConfig(String sourceNamesrvAddr, String srcCluster, String stor
this.srcAclEnable = srcAclEnable;
this.srcAccessKey = srcAccessKey;
this.srcSecretKey = srcSecretKey;
this.filterRule = filterRule;
}

public String getSourceNamesrvAddr() {
Expand Down Expand Up @@ -109,4 +112,12 @@ public String getSrcSecretKey() {
public void setSrcSecretKey(String srcSecretKey) {
this.srcSecretKey = srcSecretKey;
}

public String getFilterRule() {
return filterRule;
}

public void setFilterRule(String filterRule) {
this.filterRule = filterRule;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public List<KeyValue> divide(Map<String, Set<TaskTopicInfo>> topicMap, TaskDivid
keyValue.put(TaskConfigEnum.TASK_SOURCE_ACL_ENABLE.getKey(), String.valueOf(tdc.isSrcAclEnable()));
keyValue.put(TaskConfigEnum.TASK_SOURCE_ACCESS_KEY.getKey(), tdc.getSrcAccessKey());
keyValue.put(TaskConfigEnum.TASK_SOURCE_SECRET_KEY.getKey(), tdc.getSrcSecretKey());
keyValue.put(TaskConfigEnum.TASK_FILTER_RULE.getKey(), tdc.getFilterRule());
config.add(keyValue);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public List<KeyValue> divide(Map<String, Set<TaskTopicInfo>> topicRouteMap, Task
keyValue.put(TaskConfigEnum.TASK_SOURCE_ACL_ENABLE.getKey(), String.valueOf(tdc.isSrcAclEnable()));
keyValue.put(TaskConfigEnum.TASK_SOURCE_ACCESS_KEY.getKey(), tdc.getSrcAccessKey());
keyValue.put(TaskConfigEnum.TASK_SOURCE_SECRET_KEY.getKey(), tdc.getSrcSecretKey());
keyValue.put(TaskConfigEnum.TASK_FILTER_RULE.getKey(), tdc.getFilterRule());
config.add(keyValue);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public List<KeyValue> divide(Map<String, Set<TaskTopicInfo>> topicRouteMap, Task
keyValue.put(TaskConfigEnum.TASK_SOURCE_ACL_ENABLE.getKey(), String.valueOf(tdc.isSrcAclEnable()));
keyValue.put(TaskConfigEnum.TASK_SOURCE_ACCESS_KEY.getKey(), tdc.getSrcAccessKey());
keyValue.put(TaskConfigEnum.TASK_SOURCE_SECRET_KEY.getKey(), tdc.getSrcSecretKey());
keyValue.put(TaskConfigEnum.TASK_FILTER_RULE.getKey(), tdc.getFilterRule());
config.add(keyValue);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ public void testDivideTaskByTopic() {
DataType.COMMON_MESSAGE.ordinal(),
false,
"",
""
"",
"TagA"
);
List<KeyValue> taskConfigs = config.getTaskDivideStrategy().divide(topicRouteMap, tdc, 4);
assertThat(taskConfigs.size()).isEqualTo(4);
Expand Down Expand Up @@ -125,7 +126,8 @@ public void testDivideTaskByQueue() {
DataType.COMMON_MESSAGE.ordinal(),
false,
"",
""
"",
"TagA||TagB"
);
List<KeyValue> taskConfigs = config.getTaskDivideStrategy().divide(topicRouteMap, tdc, 4);
assertThat(taskConfigs.size()).isEqualTo(4);
Expand Down Expand Up @@ -184,6 +186,7 @@ public void testDivideTaskByHash() {
DataType.COMMON_MESSAGE.ordinal(),
false,
"",
"",
""
);
DivideTaskByConsistentHash hash = new DivideTaskByConsistentHash();
Expand Down