From e282319311f03275a27a00407740d790a120b8d9 Mon Sep 17 00:00:00 2001 From: wangkai Date: Wed, 5 Apr 2023 20:25:08 +0800 Subject: [PATCH 1/2] 1.alter unnecessary validation --- .../ReplicatorCheckpointConnector.java | 32 ++++++++--------- .../ReplicatorHeartbeatConnector.java | 36 +++++++++---------- .../replicator/ReplicatorSourceConnector.java | 32 ++++++++--------- .../replicator/utils/ReplicatorUtils.java | 8 ++--- 4 files changed, 54 insertions(+), 54 deletions(-) diff --git a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointConnector.java b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointConnector.java index c1dbdc656..9a263b057 100644 --- a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointConnector.java +++ b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorCheckpointConnector.java @@ -39,9 +39,9 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; -import java.util.HashSet; +import java.util.HashMap; import java.util.List; -import java.util.Set; +import java.util.Map; import java.util.UUID; public class ReplicatorCheckpointConnector extends SourceConnector { @@ -89,21 +89,21 @@ public Class taskClass() { } - private Set neededParamKeys = new HashSet() { + private Map neededParamKeys = new HashMap() { { - add(ReplicatorConnectorConfig.SRC_CLOUD); - add(ReplicatorConnectorConfig.SRC_REGION); - add(ReplicatorConnectorConfig.SRC_CLUSTER); - add(ReplicatorConnectorConfig.SRC_ENDPOINT); - add(ReplicatorConnectorConfig.SRC_TOPICS); - add(ReplicatorConnectorConfig.DEST_CLOUD); - add(ReplicatorConnectorConfig.DEST_REGION); - add(ReplicatorConnectorConfig.DEST_CLUSTER); - add(ReplicatorConnectorConfig.DEST_ENDPOINT); - add(ReplicatorConnectorConfig.SRC_CLOUD); - add(ReplicatorConnectorConfig.SYNC_GIDS); - add(ReplicatorConnectorConfig.SRC_ACL_ENABLE); - add(ReplicatorConnectorConfig.DEST_ACL_ENABLE); + put(ReplicatorConnectorConfig.SRC_CLOUD, false); + put(ReplicatorConnectorConfig.SRC_REGION, false); + put(ReplicatorConnectorConfig.SRC_CLUSTER, false); + put(ReplicatorConnectorConfig.SRC_ENDPOINT, true); + put(ReplicatorConnectorConfig.SRC_TOPICS, true); + put(ReplicatorConnectorConfig.DEST_CLOUD, false); + put(ReplicatorConnectorConfig.DEST_REGION, false); + put(ReplicatorConnectorConfig.DEST_CLUSTER, false); + put(ReplicatorConnectorConfig.DEST_ENDPOINT, true); + put(ReplicatorConnectorConfig.SRC_CLOUD, false); + put(ReplicatorConnectorConfig.SYNC_GIDS, true); + put(ReplicatorConnectorConfig.SRC_ACL_ENABLE, false); + put(ReplicatorConnectorConfig.DEST_ACL_ENABLE, false); } }; diff --git a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorHeartbeatConnector.java b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorHeartbeatConnector.java index 36520bfd1..26e1fed8b 100644 --- a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorHeartbeatConnector.java +++ b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorHeartbeatConnector.java @@ -27,9 +27,9 @@ import org.apache.rocketmq.replicator.utils.ReplicatorUtils; import java.util.ArrayList; -import java.util.HashSet; +import java.util.HashMap; import java.util.List; -import java.util.Set; +import java.util.Map; import static org.apache.rocketmq.connect.runtime.config.SourceConnectorConfig.CONNECT_TOPICNAME; @@ -59,23 +59,23 @@ public Class taskClass() { return ReplicatorHeartbeatTask.class; } - private Set neededParamKeys = new HashSet() { + private Map neededParamKeys = new HashMap() { { - add(ReplicatorConnectorConfig.SRC_CLOUD); - add(ReplicatorConnectorConfig.SRC_REGION); - add(ReplicatorConnectorConfig.SRC_CLUSTER); - add(ReplicatorConnectorConfig.SRC_INSTANCEID); - add(ReplicatorConnectorConfig.SRC_ENDPOINT); - add(ReplicatorConnectorConfig.SRC_TOPICTAGS); - add(ReplicatorConnectorConfig.DEST_CLOUD); - add(ReplicatorConnectorConfig.DEST_REGION); - add(ReplicatorConnectorConfig.DEST_CLUSTER); - add(ReplicatorConnectorConfig.DEST_INSTANCEID); - add(ReplicatorConnectorConfig.DEST_ENDPOINT); - add(ReplicatorConnectorConfig.DEST_TOPIC); - add(ReplicatorConnectorConfig.SRC_CLOUD); - add(ReplicatorConnectorConfig.SRC_ACL_ENABLE); - add(ReplicatorConnectorConfig.DEST_ACL_ENABLE); + put(ReplicatorConnectorConfig.SRC_CLOUD, false); + put(ReplicatorConnectorConfig.SRC_REGION, false); + put(ReplicatorConnectorConfig.SRC_CLUSTER, false); + put(ReplicatorConnectorConfig.SRC_INSTANCEID, false); + put(ReplicatorConnectorConfig.SRC_ENDPOINT, true); + put(ReplicatorConnectorConfig.SRC_TOPICTAGS, true); + put(ReplicatorConnectorConfig.DEST_CLOUD, false); + put(ReplicatorConnectorConfig.DEST_REGION, false); + put(ReplicatorConnectorConfig.DEST_CLUSTER, false); + put(ReplicatorConnectorConfig.DEST_INSTANCEID, false); + put(ReplicatorConnectorConfig.DEST_ENDPOINT, true); + put(ReplicatorConnectorConfig.DEST_TOPIC, true); + put(ReplicatorConnectorConfig.SRC_CLOUD, false); + put(ReplicatorConnectorConfig.SRC_ACL_ENABLE, false); + put(ReplicatorConnectorConfig.DEST_ACL_ENABLE, false); } }; diff --git a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java index efd34e1ae..15e88e113 100644 --- a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java +++ b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/ReplicatorSourceConnector.java @@ -208,22 +208,22 @@ public Class taskClass() { return ReplicatorSourceTask.class; } - private Set neededParamKeys = new HashSet() { + private Map neededParamKeys = new HashMap() { { - add(ReplicatorConnectorConfig.SRC_CLOUD); - add(ReplicatorConnectorConfig.SRC_REGION); - add(ReplicatorConnectorConfig.SRC_CLUSTER); - add(ReplicatorConnectorConfig.SRC_ENDPOINT); - add(ReplicatorConnectorConfig.SRC_TOPICTAGS); - add(ReplicatorConnectorConfig.DEST_CLOUD); - add(ReplicatorConnectorConfig.DEST_REGION); - add(ReplicatorConnectorConfig.DEST_CLUSTER); - add(ReplicatorConnectorConfig.DEST_ENDPOINT); - add(ReplicatorConnectorConfig.DEST_TOPIC); - add(ReplicatorConnectorConfig.SRC_CLOUD); - add(ReplicatorConnectorConfig.SRC_ACL_ENABLE); - add(ReplicatorConnectorConfig.DEST_ACL_ENABLE); - add(ERRORS_TOLERANCE_CONFIG); + put(ReplicatorConnectorConfig.SRC_CLOUD, false); + put(ReplicatorConnectorConfig.SRC_REGION, false); + put(ReplicatorConnectorConfig.SRC_CLUSTER, false); + put(ReplicatorConnectorConfig.SRC_ENDPOINT, true); + put(ReplicatorConnectorConfig.SRC_TOPICTAGS, true); + put(ReplicatorConnectorConfig.DEST_CLOUD, false); + put(ReplicatorConnectorConfig.DEST_REGION, false); + put(ReplicatorConnectorConfig.DEST_CLUSTER, false); + put(ReplicatorConnectorConfig.DEST_ENDPOINT, true); + put(ReplicatorConnectorConfig.DEST_TOPIC, true); + put(ReplicatorConnectorConfig.SRC_CLOUD, false); + put(ReplicatorConnectorConfig.SRC_ACL_ENABLE, false); + put(ReplicatorConnectorConfig.DEST_ACL_ENABLE, false); + put(ERRORS_TOLERANCE_CONFIG, false); } }; @@ -239,7 +239,7 @@ public void validate(KeyValue config) { ReplicatorUtils.checkNeedParams(ReplicatorSourceConnector.class.getName(), config, neededParamKeys); String consumeFromWhere = config.getString(ReplicatorConnectorConfig.CONSUME_FROM_WHERE, ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET.name()); if (StringUtils.isNotBlank(consumeFromWhere) && consumeFromWhere.equals(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP.name())) { - ReplicatorUtils.checkNeedParamNotEmpty(ReplicatorSourceConnector.class.getName(), config, ReplicatorConnectorConfig.CONSUME_FROM_TIMESTAMP); + ReplicatorUtils.checkNeedParamNotEmpty(ReplicatorSourceConnector.class.getName(), config, ReplicatorConnectorConfig.CONSUME_FROM_TIMESTAMP, true); } } diff --git a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/utils/ReplicatorUtils.java b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/utils/ReplicatorUtils.java index fd28a9b73..9f5baca24 100644 --- a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/utils/ReplicatorUtils.java +++ b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/utils/ReplicatorUtils.java @@ -63,13 +63,13 @@ public static String buildConsumergroupWithNamespace(String consumerGroup, Strin return instanceId + "%" + consumerGroup; } - public static void checkNeedParams(String connectorName, KeyValue config, Set neededParamKeys) { - for (String needParamKey : neededParamKeys) { - checkNeedParamNotEmpty(connectorName, config, needParamKey); + public static void checkNeedParams(String connectorName, KeyValue config, Map neededParamKeys) { + for (String needParamKey : neededParamKeys.keySet()) { + checkNeedParamNotEmpty(connectorName, config, needParamKey, neededParamKeys.get(needParamKey)); } } - public static void checkNeedParamNotEmpty(String connectorName, KeyValue config, String needParamKey) { + public static void checkNeedParamNotEmpty(String connectorName, KeyValue config, String needParamKey, boolean isNeeded) { if (StringUtils.isEmpty(config.getString(needParamKey, ""))) { log.error("Replicator connector " + connectorName + " do not set " + needParamKey); throw new ParamInvalidException("Replicator connector " + connectorName + " do not set " + needParamKey); From 6feae1ebdd16fcfb7aceea8a34e1c247fd560b2f Mon Sep 17 00:00:00 2001 From: wangkai Date: Wed, 5 Apr 2023 23:22:16 +0800 Subject: [PATCH 2/2] alter rmq replicator unnecessary validation --- .../org/apache/rocketmq/replicator/utils/ReplicatorUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/utils/ReplicatorUtils.java b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/utils/ReplicatorUtils.java index 9f5baca24..e003cdaae 100644 --- a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/utils/ReplicatorUtils.java +++ b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/utils/ReplicatorUtils.java @@ -70,7 +70,7 @@ public static void checkNeedParams(String connectorName, KeyValue config, Map