Skip to content

Commit

Permalink
fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Mar 8, 2023
1 parent 3431b4b commit 1b9fd88
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ public class DistributedConnectStartup {
public static Properties properties = null;

public static void main(String[] args) {
args = new String[]{"-c /Users/sunxiaojian/work/test-config/connect-standalone-copy.conf"};
start(createConnectController(args));
}

Expand Down Expand Up @@ -132,16 +131,16 @@ private static DistributedConnectController createConnectController(String[] arg
in.close();
}

// if (null == config.getConnectHome()) {
// System.out.printf("Please set the %s variable in your environment to match the location of the Connect installation", WorkerConfig.CONNECT_HOME_ENV);
// System.exit(-2);
// }
//
// LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
// JoranConfigurator configurator = new JoranConfigurator();
// configurator.setContext(lc);
// lc.reset();
// configurator.doConfigure(config.getConnectHome() + "/conf/logback.xml");
if (null == config.getConnectHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the Connect installation", WorkerConfig.CONNECT_HOME_ENV);
System.exit(-2);
}

LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(config.getConnectHome() + "/conf/logback.xml");

List<String> pluginPaths = new ArrayList<>(16);
if (StringUtils.isNotEmpty(config.getPluginPaths())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@

import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.component.connector.Connector;
import org.apache.commons.collections.CollectionUtils;
import org.apache.rocketmq.connect.common.constant.LoggerName;
import org.apache.rocketmq.connect.runtime.common.ConfigException;
import io.openmessaging.connector.api.component.task.sink.SinkConnector;
import io.openmessaging.connector.api.component.task.source.SourceConnector;
import io.openmessaging.connector.api.data.RecordConverter;
Expand All @@ -30,7 +27,18 @@
import io.openmessaging.connector.api.data.SchemaBuilder;
import io.openmessaging.connector.api.data.Struct;
import io.openmessaging.connector.api.errors.ConnectException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.connect.common.constant.LoggerName;
import org.apache.rocketmq.connect.runtime.common.ConfigException;
import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
import org.apache.rocketmq.connect.runtime.config.ConnectorConfig;
import org.apache.rocketmq.connect.runtime.config.SinkConnectorConfig;
Expand All @@ -49,15 +57,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import static org.apache.rocketmq.connect.runtime.config.ConnectorConfig.CONNECTOR_CLASS;

/**
Expand All @@ -72,53 +71,52 @@ public abstract class AbstractConfigManagementService implements ConfigManagemen
public static final String DELETE_CONNECTOR_PREFIX = "delete-";
protected static final String FIELD_STATE = "state";
protected static final String FIELD_EPOCH = "epoch";
protected static final String FIELD_PROPS = "properties";
protected static final String FIELD_DELETED = "deleted";
/**
* delete connector V0
*/
@Deprecated
public static final Schema CONNECTOR_DELETE_CONFIGURATION_V0 = SchemaBuilder.struct()
.field(FIELD_EPOCH, SchemaBuilder.int64().build())
.build();

/**
* delete connector V1
*/
public static final Schema CONNECTOR_DELETE_CONFIGURATION_V1 = SchemaBuilder.struct()
.field(FIELD_EPOCH, SchemaBuilder.int64().build())
.field(FIELD_DELETED, SchemaBuilder.bool().build())
.build();
.field(FIELD_EPOCH, SchemaBuilder.int64().build())
.build();
/**
* connector state
*/
public static final Schema TARGET_STATE_V0 = SchemaBuilder.struct()
.field(FIELD_STATE, SchemaBuilder.string().build())
.field(FIELD_EPOCH, SchemaBuilder.int64().build())
.build();
.field(FIELD_STATE, SchemaBuilder.string().build())
.field(FIELD_EPOCH, SchemaBuilder.int64().build())
.build();
protected static final String FIELD_PROPS = "properties";
/**
* connector configuration
*/
public static final Schema CONNECTOR_CONFIGURATION_V0 = SchemaBuilder.struct()
.field(FIELD_STATE, SchemaBuilder.string().build())
.field(FIELD_EPOCH, SchemaBuilder.int64().build())
.field(FIELD_PROPS,
SchemaBuilder.map(
SchemaBuilder.string().optional().build(),
SchemaBuilder.string().optional().build()
).build())
.build();
.field(FIELD_STATE, SchemaBuilder.string().build())
.field(FIELD_EPOCH, SchemaBuilder.int64().build())
.field(FIELD_PROPS,
SchemaBuilder.map(
SchemaBuilder.string().optional().build(),
SchemaBuilder.string().optional().build()
).build())
.build();
/**
* task configuration
*/
public static final Schema TASK_CONFIGURATION_V0 = SchemaBuilder.struct()
.field(FIELD_EPOCH, SchemaBuilder.int64().build())
.field(FIELD_PROPS,
SchemaBuilder.map(
SchemaBuilder.string().build(),
SchemaBuilder.string().optional().build()
).build())
.build();
.field(FIELD_EPOCH, SchemaBuilder.int64().build())
.field(FIELD_PROPS,
SchemaBuilder.map(
SchemaBuilder.string().build(),
SchemaBuilder.string().optional().build()
).build())
.build();
protected static final String FIELD_DELETED = "deleted";
/**
* delete connector V1
*/
public static final Schema CONNECTOR_DELETE_CONFIGURATION_V1 = SchemaBuilder.struct()
.field(FIELD_EPOCH, SchemaBuilder.int64().build())
.field(FIELD_DELETED, SchemaBuilder.bool().build())
.build();
private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
protected final String configManagePrefix = "ConfigManage";
/**
Expand Down Expand Up @@ -172,7 +170,6 @@ public void initialize(WorkerConfig workerConfig, RecordConverter converter, Plu
this.dataSynchronizer = initializationDataSynchronizer(workerConfig);
}


@Override
public boolean enabledCompactTopic() {
return false;
Expand Down Expand Up @@ -306,7 +303,6 @@ protected void putTaskConfigs(String connectorName, List<ConnectKeyValue> config
taskKeyValueStore.put(connectorName, configs);
}


@Override
public void recomputeTaskConfigs(String connectorName, ConnectKeyValue configs) {
int maxTask = configs.getInt(ConnectorConfig.MAX_TASK, ConnectorConfig.TASKS_MAX_DEFAULT);
Expand Down Expand Up @@ -394,7 +390,6 @@ public ClusterConfigState snapshot() {
return new ClusterConfigState(connectorTaskCounts, connectorConfigs, connectorTargetStates, connectorTaskConfigs);
}


@Override
public Plugin getPlugin() {
return this.plugin;
Expand All @@ -418,7 +413,6 @@ public void triggerListener() {
}
}


// ======= Start receives the config message and transforms the storage ======

protected void process(String key, SchemaAndValue schemaAndValue) {
Expand Down Expand Up @@ -469,7 +463,7 @@ private void processDeleteConnectorRecord(String connectorName, SchemaAndValue s
// config update
if ((Long) epoch > oldConfig.getEpoch()) {
// remove
if (connectorKeyValueStore.containsKey(connectorName)){
if (connectorKeyValueStore.containsKey(connectorName)) {
connectorKeyValueStore.remove(connectorName);
}
if (taskKeyValueStore.containsKey(connectorName)) {
Expand Down Expand Up @@ -520,14 +514,14 @@ private void processTargetStateRecord(String connectorName, SchemaAndValue schem
if (!(targetState instanceof String)) {
// target state
log.error("Invalid data for target state for connector '{}': 'state' field should be a String but is {}",
connectorName, className(targetState));
connectorName, className(targetState));
return;
}
Object epoch = struct.get(FIELD_EPOCH);
if (!(epoch instanceof Long)) {
// epoch
log.error("Invalid data for epoch for connector '{}': 'epoch' field should be a Long but is {}",
connectorName, className(epoch));
connectorName, className(epoch));
return;
}

Expand Down Expand Up @@ -555,21 +549,21 @@ private boolean mergeConnectConfig(String connectName, SchemaAndValue schemaAndV
if (!(targetState instanceof String)) {
// target state
log.error("Invalid data for target state for connector '{}': 'state' field should be a String but is {}",
connectName, className(targetState));
connectName, className(targetState));
return false;
}
Object epoch = value.get(FIELD_EPOCH);
if (!(epoch instanceof Long)) {
// epoch
log.error("Invalid data for epoch for connector '{}': 'state' field should be a long but is {}",
connectName, className(epoch));
connectName, className(epoch));
return false;
}
Object props = value.get(FIELD_PROPS);
if (!(props instanceof Map)) {
// properties
log.error("Invalid data for properties for connector '{}': 'state' field should be a Map but is {}",
connectName, className(props));
connectName, className(props));
return false;
}
// new configs
Expand Down

0 comments on commit 1b9fd88

Please sign in to comment.