diff --git a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointConnector.java b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointConnector.java index c1dbdc65..9a263b05 100644 --- a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointConnector.java +++ b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointConnector.java @@ -39,9 +39,9 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; -import java.util.HashSet; +import java.util.HashMap; import java.util.List; -import java.util.Set; +import java.util.Map; import java.util.UUID; public class ReplicatorCheckpointConnector extends SourceConnector { @@ -89,21 +89,21 @@ public Class taskClass() { } - private Set neededParamKeys = new HashSet() { + private Map neededParamKeys = new HashMap() { { - add(ReplicatorConnectorConfig.SRC_CLOUD); - add(ReplicatorConnectorConfig.SRC_REGION); - add(ReplicatorConnectorConfig.SRC_CLUSTER); - add(ReplicatorConnectorConfig.SRC_ENDPOINT); - add(ReplicatorConnectorConfig.SRC_TOPICS); - add(ReplicatorConnectorConfig.DEST_CLOUD); - add(ReplicatorConnectorConfig.DEST_REGION); - add(ReplicatorConnectorConfig.DEST_CLUSTER); - add(ReplicatorConnectorConfig.DEST_ENDPOINT); - add(ReplicatorConnectorConfig.SRC_CLOUD); - add(ReplicatorConnectorConfig.SYNC_GIDS); - add(ReplicatorConnectorConfig.SRC_ACL_ENABLE); - add(ReplicatorConnectorConfig.DEST_ACL_ENABLE); + put(ReplicatorConnectorConfig.SRC_CLOUD, false); + put(ReplicatorConnectorConfig.SRC_REGION, false); + put(ReplicatorConnectorConfig.SRC_CLUSTER, false); + put(ReplicatorConnectorConfig.SRC_ENDPOINT, true); + put(ReplicatorConnectorConfig.SRC_TOPICS, true); + put(ReplicatorConnectorConfig.DEST_CLOUD, false); + put(ReplicatorConnectorConfig.DEST_REGION, false); + put(ReplicatorConnectorConfig.DEST_CLUSTER, false); + put(ReplicatorConnectorConfig.DEST_ENDPOINT, true); + put(ReplicatorConnectorConfig.SRC_CLOUD, false); + put(ReplicatorConnectorConfig.SYNC_GIDS, true); + put(ReplicatorConnectorConfig.SRC_ACL_ENABLE, false); + put(ReplicatorConnectorConfig.DEST_ACL_ENABLE, false); } }; diff --git a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorHeartbeatConnector.java b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorHeartbeatConnector.java index 36520bfd..26e1fed8 100644 --- a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorHeartbeatConnector.java +++ b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorHeartbeatConnector.java @@ -27,9 +27,9 @@ import org.apache.rocketmq.replicator.utils.ReplicatorUtils; import java.util.ArrayList; -import java.util.HashSet; +import java.util.HashMap; import java.util.List; -import java.util.Set; +import java.util.Map; import static org.apache.rocketmq.connect.runtime.config.SourceConnectorConfig.CONNECT_TOPICNAME; @@ -59,23 +59,23 @@ public Class taskClass() { return ReplicatorHeartbeatTask.class; } - private Set neededParamKeys = new HashSet() { + private Map neededParamKeys = new HashMap() { { - add(ReplicatorConnectorConfig.SRC_CLOUD); - add(ReplicatorConnectorConfig.SRC_REGION); - add(ReplicatorConnectorConfig.SRC_CLUSTER); - add(ReplicatorConnectorConfig.SRC_INSTANCEID); - add(ReplicatorConnectorConfig.SRC_ENDPOINT); - add(ReplicatorConnectorConfig.SRC_TOPICTAGS); - add(ReplicatorConnectorConfig.DEST_CLOUD); - add(ReplicatorConnectorConfig.DEST_REGION); - add(ReplicatorConnectorConfig.DEST_CLUSTER); - add(ReplicatorConnectorConfig.DEST_INSTANCEID); - add(ReplicatorConnectorConfig.DEST_ENDPOINT); - add(ReplicatorConnectorConfig.DEST_TOPIC); - add(ReplicatorConnectorConfig.SRC_CLOUD); - add(ReplicatorConnectorConfig.SRC_ACL_ENABLE); - add(ReplicatorConnectorConfig.DEST_ACL_ENABLE); + put(ReplicatorConnectorConfig.SRC_CLOUD, false); + put(ReplicatorConnectorConfig.SRC_REGION, false); + put(ReplicatorConnectorConfig.SRC_CLUSTER, false); + put(ReplicatorConnectorConfig.SRC_INSTANCEID, false); + put(ReplicatorConnectorConfig.SRC_ENDPOINT, true); + put(ReplicatorConnectorConfig.SRC_TOPICTAGS, true); + put(ReplicatorConnectorConfig.DEST_CLOUD, false); + put(ReplicatorConnectorConfig.DEST_REGION, false); + put(ReplicatorConnectorConfig.DEST_CLUSTER, false); + put(ReplicatorConnectorConfig.DEST_INSTANCEID, false); + put(ReplicatorConnectorConfig.DEST_ENDPOINT, true); + put(ReplicatorConnectorConfig.DEST_TOPIC, true); + put(ReplicatorConnectorConfig.SRC_CLOUD, false); + put(ReplicatorConnectorConfig.SRC_ACL_ENABLE, false); + put(ReplicatorConnectorConfig.DEST_ACL_ENABLE, false); } }; diff --git a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java index efd34e1a..15e88e11 100644 --- a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java +++ b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java @@ -208,22 +208,22 @@ public Class taskClass() { return ReplicatorSourceTask.class; } - private Set neededParamKeys = new HashSet() { + private Map neededParamKeys = new HashMap() { { - add(ReplicatorConnectorConfig.SRC_CLOUD); - add(ReplicatorConnectorConfig.SRC_REGION); - add(ReplicatorConnectorConfig.SRC_CLUSTER); - add(ReplicatorConnectorConfig.SRC_ENDPOINT); - add(ReplicatorConnectorConfig.SRC_TOPICTAGS); - add(ReplicatorConnectorConfig.DEST_CLOUD); - add(ReplicatorConnectorConfig.DEST_REGION); - add(ReplicatorConnectorConfig.DEST_CLUSTER); - add(ReplicatorConnectorConfig.DEST_ENDPOINT); - add(ReplicatorConnectorConfig.DEST_TOPIC); - add(ReplicatorConnectorConfig.SRC_CLOUD); - add(ReplicatorConnectorConfig.SRC_ACL_ENABLE); - add(ReplicatorConnectorConfig.DEST_ACL_ENABLE); - add(ERRORS_TOLERANCE_CONFIG); + put(ReplicatorConnectorConfig.SRC_CLOUD, false); + put(ReplicatorConnectorConfig.SRC_REGION, false); + put(ReplicatorConnectorConfig.SRC_CLUSTER, false); + put(ReplicatorConnectorConfig.SRC_ENDPOINT, true); + put(ReplicatorConnectorConfig.SRC_TOPICTAGS, true); + put(ReplicatorConnectorConfig.DEST_CLOUD, false); + put(ReplicatorConnectorConfig.DEST_REGION, false); + put(ReplicatorConnectorConfig.DEST_CLUSTER, false); + put(ReplicatorConnectorConfig.DEST_ENDPOINT, true); + put(ReplicatorConnectorConfig.DEST_TOPIC, true); + put(ReplicatorConnectorConfig.SRC_CLOUD, false); + put(ReplicatorConnectorConfig.SRC_ACL_ENABLE, false); + put(ReplicatorConnectorConfig.DEST_ACL_ENABLE, false); + put(ERRORS_TOLERANCE_CONFIG, false); } }; @@ -239,7 +239,7 @@ public void validate(KeyValue config) { ReplicatorUtils.checkNeedParams(ReplicatorSourceConnector.class.getName(), config, neededParamKeys); String consumeFromWhere = config.getString(ReplicatorConnectorConfig.CONSUME_FROM_WHERE, ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET.name()); if (StringUtils.isNotBlank(consumeFromWhere) && consumeFromWhere.equals(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP.name())) { - ReplicatorUtils.checkNeedParamNotEmpty(ReplicatorSourceConnector.class.getName(), config, ReplicatorConnectorConfig.CONSUME_FROM_TIMESTAMP); + ReplicatorUtils.checkNeedParamNotEmpty(ReplicatorSourceConnector.class.getName(), config, ReplicatorConnectorConfig.CONSUME_FROM_TIMESTAMP, true); } } diff --git a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/utils/ReplicatorUtils.java b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/utils/ReplicatorUtils.java index fd28a9b7..e003cdaa 100644 --- a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/utils/ReplicatorUtils.java +++ b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/utils/ReplicatorUtils.java @@ -63,14 +63,14 @@ public static String buildConsumergroupWithNamespace(String consumerGroup, Strin return instanceId + "%" + consumerGroup; } - public static void checkNeedParams(String connectorName, KeyValue config, Set neededParamKeys) { - for (String needParamKey : neededParamKeys) { - checkNeedParamNotEmpty(connectorName, config, needParamKey); + public static void checkNeedParams(String connectorName, KeyValue config, Map neededParamKeys) { + for (String needParamKey : neededParamKeys.keySet()) { + checkNeedParamNotEmpty(connectorName, config, needParamKey, neededParamKeys.get(needParamKey)); } } - public static void checkNeedParamNotEmpty(String connectorName, KeyValue config, String needParamKey) { - if (StringUtils.isEmpty(config.getString(needParamKey, ""))) { + public static void checkNeedParamNotEmpty(String connectorName, KeyValue config, String needParamKey, boolean isNeeded) { + if (StringUtils.isEmpty(config.getString(needParamKey, "")) && isNeeded) { log.error("Replicator connector " + connectorName + " do not set " + needParamKey); throw new ParamInvalidException("Replicator connector " + connectorName + " do not set " + needParamKey); }