Skip to content

Commit

Permalink
[ISSUE #466] Make srcTopics and srcTopicTags to one parameter (#467)
Browse files Browse the repository at this point in the history
  • Loading branch information
RongtongJin authored Apr 7, 2023
1 parent 176bb87 commit 47d47e5
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public List<KeyValue> taskConfigs(int maxTasks) {
keyValue.put(ReplicatorConnectorConfig.SRC_ACL_ENABLE, config.getString(ReplicatorConnectorConfig.SRC_ACL_ENABLE, "false"));
keyValue.put(ReplicatorConnectorConfig.SRC_ACCESS_KEY, config.getString(ReplicatorConnectorConfig.SRC_ACCESS_KEY, ""));
keyValue.put(ReplicatorConnectorConfig.SRC_SECRET_KEY, config.getString(ReplicatorConnectorConfig.SRC_SECRET_KEY, ""));
keyValue.put(ReplicatorConnectorConfig.SRC_TOPICS, config.getString(ReplicatorConnectorConfig.SRC_TOPICS));
keyValue.put(ReplicatorConnectorConfig.SRC_TOPICTAGS, config.getString(ReplicatorConnectorConfig.SRC_TOPICTAGS));
keyValue.put(ReplicatorConnectorConfig.DEST_CLOUD, config.getString(ReplicatorConnectorConfig.DEST_CLOUD));
keyValue.put(ReplicatorConnectorConfig.DEST_REGION, config.getString(ReplicatorConnectorConfig.DEST_REGION));
keyValue.put(ReplicatorConnectorConfig.DEST_CLUSTER, config.getString(ReplicatorConnectorConfig.DEST_CLUSTER));
Expand Down Expand Up @@ -94,7 +94,7 @@ public Class<? extends Task> taskClass() {
add(ReplicatorConnectorConfig.SRC_REGION);
add(ReplicatorConnectorConfig.SRC_CLUSTER);
add(ReplicatorConnectorConfig.SRC_ENDPOINT);
add(ReplicatorConnectorConfig.SRC_TOPICS);
add(ReplicatorConnectorConfig.SRC_TOPICTAGS);
add(ReplicatorConnectorConfig.DEST_CLOUD);
add(ReplicatorConnectorConfig.DEST_REGION);
add(ReplicatorConnectorConfig.DEST_CLUSTER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public List<ConnectRecord> poll() throws InterruptedException {
lastCheckPointTimestamp = System.currentTimeMillis();
return null;
}
Set<String> srcTopics = connectorConfig.getSrcTopics(connectorConfig.getSrcTopics());
Set<String> srcTopics = ReplicatorConnectorConfig.getSrcTopicTagMap(connectorConfig.getSrcInstanceId(), connectorConfig.getSrcTopicTags()).keySet();
try {
String[] syncGidArr = syncGids.split(connectorConfig.GID_SPLITTER);
for (String consumerGroup : syncGidArr) {
Expand Down Expand Up @@ -258,7 +258,7 @@ private void fillConnectorConfig(KeyValue config) {
connectorConfig.setSrcCluster(config.getString(connectorConfig.SRC_CLUSTER));
connectorConfig.setSrcInstanceId(config.getString(connectorConfig.SRC_INSTANCEID));
connectorConfig.setSrcEndpoint(config.getString(connectorConfig.SRC_ENDPOINT));
connectorConfig.setSrcTopics(config.getString(connectorConfig.SRC_TOPICS));
connectorConfig.setSrcTopicTags(config.getString(connectorConfig.getSrcTopicTags()));
connectorConfig.setDestCloud(config.getString(connectorConfig.DEST_CLOUD));
connectorConfig.setDestRegion(config.getString(connectorConfig.DEST_REGION));
connectorConfig.setDestCluster(config.getString(connectorConfig.DEST_CLUSTER));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ public class ReplicatorConnectorConfig {
private String srcCluster;
private String srcInstanceId;
private String srcTopicTags; // format topic-1,tag-a;topic-2,tag-b;topic-3,tag-c
private String srcTopics; // format topic-1;topic-2,tag-b;topic-3,tag-c
private String srcEndpoint;
private boolean srcAclEnable;
private boolean autoCreateInnerConsumergroup;
Expand All @@ -66,7 +65,7 @@ public class ReplicatorConnectorConfig {
private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
// consume from timestamp
private long consumeFromTimestamp = System.currentTimeMillis();
// sourcetask replicate to mq failover strategy
// source task replicate to mq failover strategy
private FailoverStrategy failoverStrategy = FailoverStrategy.DISMISS;
private boolean enableHeartbeat = true;
private boolean enableCheckpoint = true;
Expand All @@ -90,7 +89,7 @@ public class ReplicatorConnectorConfig {
private int syncTps = 1000;
private int maxTask = 2;
//
private int heartbeatIntervalMs = 1 * 1000;
private int heartbeatIntervalMs = 1000;
private int checkpointIntervalMs = 10 * 1000;
private long commitOffsetIntervalMs = 10 * 1000;
private String heartbeatTopic;
Expand Down Expand Up @@ -120,8 +119,6 @@ public class ReplicatorConnectorConfig {
public final static String SRC_CLUSTER = "src.cluster";
public final static String SRC_INSTANCEID = "src.instanceid";
public final static String SRC_TOPICTAGS = "src.topictags";

public final static String SRC_TOPICS = "src.topics";
public final static String SRC_ENDPOINT = "src.endpoint";
public final static String SRC_ACL_ENABLE = "src.acl.enable";
public final static String SRC_ACCESS_KEY = "src.access.key";
Expand Down Expand Up @@ -242,15 +239,6 @@ public static Map<String, String> getSrcTopicTagMap(String srcInstanceId, String
return topicTagMap;
}


public static Set<String> getSrcTopics(String srcTopics) {
if (StringUtils.isEmpty(srcTopics) || StringUtils.isBlank(srcTopics)) {
return null;
}
List<String> topicList = Splitter.on(TOPIC_SPLITTER).omitEmptyStrings().trimResults().splitToList(srcTopics);
return new HashSet(topicList);
}

public String getSrcTopicTags() {
return srcTopicTags;
}
Expand All @@ -259,14 +247,6 @@ public void setSrcTopicTags(String srcTopicTags) {
this.srcTopicTags = srcTopicTags;
}

public String getSrcTopics() {
return srcTopics;
}

public void setSrcTopics(String srcTopics) {
this.srcTopics = srcTopics;
}

public String getSrcEndpoint() {
return srcEndpoint;
}
Expand Down

0 comments on commit 47d47e5

Please sign in to comment.