Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

alter rmq replicator unnecessary validation #460

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.Map;
import java.util.UUID;

public class ReplicatorCheckpointConnector extends SourceConnector {
Expand Down Expand Up @@ -89,21 +89,21 @@ public Class<? extends Task> taskClass() {
}


private Set<String> neededParamKeys = new HashSet<String>() {
private Map<String, Boolean> neededParamKeys = new HashMap<String, Boolean>() {
{
add(ReplicatorConnectorConfig.SRC_CLOUD);
add(ReplicatorConnectorConfig.SRC_REGION);
add(ReplicatorConnectorConfig.SRC_CLUSTER);
add(ReplicatorConnectorConfig.SRC_ENDPOINT);
add(ReplicatorConnectorConfig.SRC_TOPICS);
add(ReplicatorConnectorConfig.DEST_CLOUD);
add(ReplicatorConnectorConfig.DEST_REGION);
add(ReplicatorConnectorConfig.DEST_CLUSTER);
add(ReplicatorConnectorConfig.DEST_ENDPOINT);
add(ReplicatorConnectorConfig.SRC_CLOUD);
add(ReplicatorConnectorConfig.SYNC_GIDS);
add(ReplicatorConnectorConfig.SRC_ACL_ENABLE);
add(ReplicatorConnectorConfig.DEST_ACL_ENABLE);
put(ReplicatorConnectorConfig.SRC_CLOUD, false);
put(ReplicatorConnectorConfig.SRC_REGION, false);
put(ReplicatorConnectorConfig.SRC_CLUSTER, false);
put(ReplicatorConnectorConfig.SRC_ENDPOINT, true);
put(ReplicatorConnectorConfig.SRC_TOPICS, true);
put(ReplicatorConnectorConfig.DEST_CLOUD, false);
put(ReplicatorConnectorConfig.DEST_REGION, false);
put(ReplicatorConnectorConfig.DEST_CLUSTER, false);
put(ReplicatorConnectorConfig.DEST_ENDPOINT, true);
put(ReplicatorConnectorConfig.SRC_CLOUD, false);
put(ReplicatorConnectorConfig.SYNC_GIDS, true);
put(ReplicatorConnectorConfig.SRC_ACL_ENABLE, false);
put(ReplicatorConnectorConfig.DEST_ACL_ENABLE, false);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
import org.apache.rocketmq.replicator.utils.ReplicatorUtils;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.Map;

import static org.apache.rocketmq.connect.runtime.config.SourceConnectorConfig.CONNECT_TOPICNAME;

Expand Down Expand Up @@ -59,23 +59,23 @@ public Class<? extends Task> taskClass() {
return ReplicatorHeartbeatTask.class;
}

private Set<String> neededParamKeys = new HashSet<String>() {
private Map<String, Boolean> neededParamKeys = new HashMap<String, Boolean>() {
{
add(ReplicatorConnectorConfig.SRC_CLOUD);
add(ReplicatorConnectorConfig.SRC_REGION);
add(ReplicatorConnectorConfig.SRC_CLUSTER);
add(ReplicatorConnectorConfig.SRC_INSTANCEID);
add(ReplicatorConnectorConfig.SRC_ENDPOINT);
add(ReplicatorConnectorConfig.SRC_TOPICTAGS);
add(ReplicatorConnectorConfig.DEST_CLOUD);
add(ReplicatorConnectorConfig.DEST_REGION);
add(ReplicatorConnectorConfig.DEST_CLUSTER);
add(ReplicatorConnectorConfig.DEST_INSTANCEID);
add(ReplicatorConnectorConfig.DEST_ENDPOINT);
add(ReplicatorConnectorConfig.DEST_TOPIC);
add(ReplicatorConnectorConfig.SRC_CLOUD);
add(ReplicatorConnectorConfig.SRC_ACL_ENABLE);
add(ReplicatorConnectorConfig.DEST_ACL_ENABLE);
put(ReplicatorConnectorConfig.SRC_CLOUD, false);
put(ReplicatorConnectorConfig.SRC_REGION, false);
put(ReplicatorConnectorConfig.SRC_CLUSTER, false);
put(ReplicatorConnectorConfig.SRC_INSTANCEID, false);
put(ReplicatorConnectorConfig.SRC_ENDPOINT, true);
put(ReplicatorConnectorConfig.SRC_TOPICTAGS, true);
put(ReplicatorConnectorConfig.DEST_CLOUD, false);
put(ReplicatorConnectorConfig.DEST_REGION, false);
put(ReplicatorConnectorConfig.DEST_CLUSTER, false);
put(ReplicatorConnectorConfig.DEST_INSTANCEID, false);
put(ReplicatorConnectorConfig.DEST_ENDPOINT, true);
put(ReplicatorConnectorConfig.DEST_TOPIC, true);
put(ReplicatorConnectorConfig.SRC_CLOUD, false);
put(ReplicatorConnectorConfig.SRC_ACL_ENABLE, false);
put(ReplicatorConnectorConfig.DEST_ACL_ENABLE, false);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,22 +208,22 @@ public Class<? extends Task> taskClass() {
return ReplicatorSourceTask.class;
}

private Set<String> neededParamKeys = new HashSet<String>() {
private Map<String, Boolean> neededParamKeys = new HashMap<String, Boolean>() {
{
add(ReplicatorConnectorConfig.SRC_CLOUD);
add(ReplicatorConnectorConfig.SRC_REGION);
add(ReplicatorConnectorConfig.SRC_CLUSTER);
add(ReplicatorConnectorConfig.SRC_ENDPOINT);
add(ReplicatorConnectorConfig.SRC_TOPICTAGS);
add(ReplicatorConnectorConfig.DEST_CLOUD);
add(ReplicatorConnectorConfig.DEST_REGION);
add(ReplicatorConnectorConfig.DEST_CLUSTER);
add(ReplicatorConnectorConfig.DEST_ENDPOINT);
add(ReplicatorConnectorConfig.DEST_TOPIC);
add(ReplicatorConnectorConfig.SRC_CLOUD);
add(ReplicatorConnectorConfig.SRC_ACL_ENABLE);
add(ReplicatorConnectorConfig.DEST_ACL_ENABLE);
add(ERRORS_TOLERANCE_CONFIG);
put(ReplicatorConnectorConfig.SRC_CLOUD, false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is not a necessary parameter, can you consider removing it from the verification?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is also possible to remove them directly, but it is easier for me to extend or customize them. It would be better to extract the required parameters into a unified public class rather than write them for each connector

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion, after the connector is implemented, the necessary configuration has been determined. In the subsequent evolution and development, of course, it is possible to challenge the verification rules of these parameters, but it is a very low-frequency operation. For example, we have some differences now. However, after our discussion this time, after confirming those that are indeed required and those that are not, there may be basically no major changes.
I think what you said about unifying the necessary parameters is a good idea. However, each connector implementation may have different validation rules, not only necessary and non-essential parameters. Can you try to tidy up some parameters though? Let's do further discussions on specific parameters?

put(ReplicatorConnectorConfig.SRC_REGION, false);
put(ReplicatorConnectorConfig.SRC_CLUSTER, false);
put(ReplicatorConnectorConfig.SRC_ENDPOINT, true);
put(ReplicatorConnectorConfig.SRC_TOPICTAGS, true);
put(ReplicatorConnectorConfig.DEST_CLOUD, false);
put(ReplicatorConnectorConfig.DEST_REGION, false);
put(ReplicatorConnectorConfig.DEST_CLUSTER, false);
put(ReplicatorConnectorConfig.DEST_ENDPOINT, true);
put(ReplicatorConnectorConfig.DEST_TOPIC, true);
put(ReplicatorConnectorConfig.SRC_CLOUD, false);
put(ReplicatorConnectorConfig.SRC_ACL_ENABLE, false);
put(ReplicatorConnectorConfig.DEST_ACL_ENABLE, false);
put(ERRORS_TOLERANCE_CONFIG, false);
}
};

Expand All @@ -239,7 +239,7 @@ public void validate(KeyValue config) {
ReplicatorUtils.checkNeedParams(ReplicatorSourceConnector.class.getName(), config, neededParamKeys);
String consumeFromWhere = config.getString(ReplicatorConnectorConfig.CONSUME_FROM_WHERE, ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET.name());
if (StringUtils.isNotBlank(consumeFromWhere) && consumeFromWhere.equals(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP.name())) {
ReplicatorUtils.checkNeedParamNotEmpty(ReplicatorSourceConnector.class.getName(), config, ReplicatorConnectorConfig.CONSUME_FROM_TIMESTAMP);
ReplicatorUtils.checkNeedParamNotEmpty(ReplicatorSourceConnector.class.getName(), config, ReplicatorConnectorConfig.CONSUME_FROM_TIMESTAMP, true);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,14 @@ public static String buildConsumergroupWithNamespace(String consumerGroup, Strin
return instanceId + "%" + consumerGroup;
}

public static void checkNeedParams(String connectorName, KeyValue config, Set<String> neededParamKeys) {
for (String needParamKey : neededParamKeys) {
checkNeedParamNotEmpty(connectorName, config, needParamKey);
public static void checkNeedParams(String connectorName, KeyValue config, Map<String, Boolean> neededParamKeys) {
for (String needParamKey : neededParamKeys.keySet()) {
checkNeedParamNotEmpty(connectorName, config, needParamKey, neededParamKeys.get(needParamKey));
}
}

public static void checkNeedParamNotEmpty(String connectorName, KeyValue config, String needParamKey) {
if (StringUtils.isEmpty(config.getString(needParamKey, ""))) {
public static void checkNeedParamNotEmpty(String connectorName, KeyValue config, String needParamKey, boolean isNeeded) {
if (StringUtils.isEmpty(config.getString(needParamKey, "")) && isNeeded) {
log.error("Replicator connector " + connectorName + " do not set " + needParamKey);
throw new ParamInvalidException("Replicator connector " + connectorName + " do not set " + needParamKey);
}
Expand Down