diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/AbstractConnectController.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/AbstractConnectController.java index 83973d22..9d23b4c0 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/AbstractConnectController.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/AbstractConnectController.java @@ -225,6 +225,25 @@ public void deleteConnectorConfig(String connectorName) { configManagementService.deleteConnectorConfig(connectorName); } + /** + * Restart the connector with the specified connector name in the cluster. + * + * @param connectorName + */ + public void restartConnector(String connectorName) { + configManagementService.restartConnector(connectorName); + } + + /** + * Restart the task with the specified task name in the cluster. + * + * @param connectorName + * @param task + */ + public void restartTask(String connectorName, Integer task) { + configManagementService.restartTask(connectorName, task); + } + /** * Pause the connector. This call will asynchronously suspend processing by the connector and all * of its tasks. diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java index 352c1ef7..f724b311 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java @@ -82,6 +82,10 @@ public RestHandler(AbstractConnectController connectController) { app.get("/connectors/{connectorName}/stop", this::handleStopConnector); app.get("/connectors/stop/all", this::handleStopAllConnector); + // restart connector + app.get("/connectors/{connectorName}/restart", this::handleRestartConnector); + app.get("/connectors/{connectorName}/tasks/{task}/restart", this::handleRestartTask); + // pause & resume app.get("/connectors/{connectorName}/pause", this::handlePauseConnector); app.get("/connectors/{connectorName}/resume", this::handleResumeConnector); @@ -236,6 +240,28 @@ private void handleStopAllConnector(Context context) { } } + private void handleRestartConnector(Context context) { + try { + String connectorName = context.pathParam(CONNECTOR_NAME); + connectController.restartConnector(connectorName); + context.json(new HttpResponse<>(context.status(), "Connector [" + connectorName + "] restarted successfully")); + } catch (Exception e) { + log.error("Restart connector failed .", e); + context.json(new ErrorMessage(HttpStatus.INTERNAL_SERVER_ERROR_500, e.getMessage())); + } + } + + public void handleRestartTask(Context context) { + try { + String connectorName = context.pathParam(CONNECTOR_NAME); + Integer task = Integer.valueOf(context.pathParam(TASK_NAME)); + connectController.restartTask(connectorName, task); + context.json(new HttpResponse<>(context.status(), "Task [" + connectorName + "/ " + task + "] restarted successfully")); + } catch (Exception ex) { + log.error("Restart task failed .", ex); + context.json(new ErrorMessage(HttpStatus.INTERNAL_SERVER_ERROR_500, ex.getMessage())); + } + } private void handlePauseConnector(Context context) { String connectorName = context.pathParam(CONNECTOR_NAME); diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementService.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementService.java index b128a7df..60d69d1b 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementService.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementService.java @@ -76,6 +76,21 @@ default void configure(WorkerConfig config) { */ void deleteConnectorConfig(String connectorName); + /** + * restart the connector with the specified connector name in the cluster. + * + * @param connectorName + */ + void restartConnector(String connectorName); + + /** + * restart the task with the specified task in the cluster. + * + * @param connectorName + * @param task + */ + void restartTask(String connectorName, Integer task); + /** * pause connector * diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java index 9fc07534..795e485c 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java @@ -85,11 +85,20 @@ public static String TASK_KEY(ConnectorTaskId taskId) { } public static final String DELETE_CONNECTOR_PREFIX = "delete-"; + public static final String RESTART_CONNECTOR_PREFIX = "restart-"; public static String DELETE_CONNECTOR_KEY(String connectorName) { return DELETE_CONNECTOR_PREFIX + connectorName; } + public static String RESTART_CONNECTOR_KEY(String connectorName) { + return RESTART_CONNECTOR_PREFIX + connectorName; + } + + public static String RESTART_TASK_KEY(String connectorName, Integer task) { + return RESTART_CONNECTOR_PREFIX + TASK_PREFIX + connectorName + "-" + task; + } + private static final String FIELD_STATE = "state"; private static final String FIELD_EPOCH = "epoch"; private static final String FIELD_PROPS = "properties"; @@ -98,48 +107,62 @@ public static String DELETE_CONNECTOR_KEY(String connectorName) { * start signal */ public static final Schema START_SIGNAL_V0 = SchemaBuilder.struct() - .field(START_SIGNAL, SchemaBuilder.string().build()) - .build(); + .field(START_SIGNAL, SchemaBuilder.string().build()) + .build(); /** * connector configuration */ public static final Schema CONNECTOR_CONFIGURATION_V0 = SchemaBuilder.struct() - .field(FIELD_STATE, SchemaBuilder.string().build()) - .field(FIELD_EPOCH, SchemaBuilder.int64().build()) - .field(FIELD_PROPS, - SchemaBuilder.map( - SchemaBuilder.string().optional().build(), - SchemaBuilder.string().optional().build() - ).build()) - .build(); + .field(FIELD_STATE, SchemaBuilder.string().build()) + .field(FIELD_EPOCH, SchemaBuilder.int64().build()) + .field(FIELD_PROPS, + SchemaBuilder.map( + SchemaBuilder.string().optional().build(), + SchemaBuilder.string().optional().build() + ).build()) + .build(); /** * delete connector */ public static final Schema CONNECTOR_DELETE_CONFIGURATION_V0 = SchemaBuilder.struct() - .field(FIELD_EPOCH, SchemaBuilder.int64().build()) - .build(); + .field(FIELD_EPOCH, SchemaBuilder.int64().build()) + .build(); + + /** + * restart connector + */ + public static final Schema CONNECTOR_RESTART_CONFIGURATION_V0 = SchemaBuilder.struct() + .field(FIELD_EPOCH, SchemaBuilder.int64().build()) + .build(); + + /** + * restart task + */ + public static final Schema TASK_RESTART_CONFIGURATION_V0 = SchemaBuilder.struct() + .field(FIELD_EPOCH, SchemaBuilder.int64().build()) + .build(); /** * task configuration */ public static final Schema TASK_CONFIGURATION_V0 = SchemaBuilder.struct() - .field(FIELD_EPOCH, SchemaBuilder.int64().build()) - .field(FIELD_PROPS, - SchemaBuilder.map( - SchemaBuilder.string().build(), - SchemaBuilder.string().optional().build() - ).build()) - .build(); + .field(FIELD_EPOCH, SchemaBuilder.int64().build()) + .field(FIELD_PROPS, + SchemaBuilder.map( + SchemaBuilder.string().build(), + SchemaBuilder.string().optional().build() + ).build()) + .build(); /** * connector state */ public static final Schema TARGET_STATE_V0 = SchemaBuilder.struct() - .field(FIELD_STATE, SchemaBuilder.string().build()) - .field(FIELD_EPOCH, SchemaBuilder.int64().build()) - .build(); + .field(FIELD_STATE, SchemaBuilder.string().build()) + .field(FIELD_EPOCH, SchemaBuilder.int64().build()) + .build(); /** * All listeners to trigger while config change. @@ -171,24 +194,24 @@ public void initialize(WorkerConfig workerConfig, RecordConverter converter, Plu this.connectorConfigUpdateListener = new HashSet<>(); this.dataSynchronizer = new BrokerBasedLog<>(workerConfig, - this.topic, - ConnectUtil.createGroupName(configManagePrefix, workerConfig.getWorkerId()), - new ConfigChangeCallback(), - Serdes.serdeFrom(String.class), - Serdes.serdeFrom(byte[].class) + this.topic, + ConnectUtil.createGroupName(configManagePrefix, workerConfig.getWorkerId()), + new ConfigChangeCallback(), + Serdes.serdeFrom(String.class), + Serdes.serdeFrom(byte[].class) ); // store connector config this.connectorKeyValueStore = new FileBaseKeyValueStore<>( - FilePathConfigUtil.getConnectorConfigPath(workerConfig.getStorePathRootDir()), - new Serdes.StringSerde(), - new JsonSerde(ConnectKeyValue.class)); + FilePathConfigUtil.getConnectorConfigPath(workerConfig.getStorePathRootDir()), + new Serdes.StringSerde(), + new JsonSerde(ConnectKeyValue.class)); // store task config this.taskKeyValueStore = new FileBaseKeyValueStore<>( - FilePathConfigUtil.getTaskConfigPath(workerConfig.getStorePathRootDir()), - new Serdes.StringSerde(), - new ListSerde(ConnectKeyValue.class)); + FilePathConfigUtil.getTaskConfigPath(workerConfig.getStorePathRootDir()), + new Serdes.StringSerde(), + new ListSerde(ConnectKeyValue.class)); this.prepare(workerConfig); } @@ -290,6 +313,45 @@ public void deleteConnectorConfig(String connectorName) { dataSynchronizer.send(DELETE_CONNECTOR_KEY(connectorName), config); } + /** + * restart connector config + * + * @param connectorName + */ + @Override + public void restartConnector(String connectorName) { + if (!connectorKeyValueStore.containsKey(connectorName)) { + throw new ConnectException("Connector [" + connectorName + "] does not exist"); + } + // new struct + Struct struct = new Struct(CONNECTOR_RESTART_CONFIGURATION_V0); + struct.put(FIELD_EPOCH, System.currentTimeMillis()); + + byte[] config = converter.fromConnectData(topic, CONNECTOR_RESTART_CONFIGURATION_V0, struct); + dataSynchronizer.send(RESTART_CONNECTOR_KEY(connectorName), config); + } + + /** + * restart task config + * + * @param connectorName + * @param task + */ + @Override + public void restartTask(String connectorName, Integer task) { + if (!connectorKeyValueStore.containsKey(connectorName)) { + throw new ConnectException("Connector [" + connectorName + "] does not exist"); + } else if (!taskKeyValueStore.containsKey(connectorName)) { + throw new ConnectException("Task [" + connectorName + "/" + task + "] does not exist"); + } + // new struct + Struct struct = new Struct(TASK_RESTART_CONFIGURATION_V0); + struct.put(FIELD_EPOCH, System.currentTimeMillis()); + + byte[] config = converter.fromConnectData(topic, TASK_RESTART_CONFIGURATION_V0, struct); + dataSynchronizer.send(RESTART_TASK_KEY(connectorName, task), config); + } + /** * pause connector * @@ -377,9 +439,9 @@ private void triggerSendMessage() { configs.setConnectorConfigs(connectorKeyValueStore.getKVMap()); connectorKeyValueStore.getKVMap().forEach((connectName, connectKeyValue) -> { Struct struct = new Struct(CONNECTOR_CONFIGURATION_V0) - .put(FIELD_EPOCH, connectKeyValue.getEpoch()) - .put(FIELD_STATE, connectKeyValue.getTargetState().name()) - .put(FIELD_PROPS, connectKeyValue.getProperties()); + .put(FIELD_EPOCH, connectKeyValue.getEpoch()) + .put(FIELD_STATE, connectKeyValue.getTargetState().name()) + .put(FIELD_PROPS, connectKeyValue.getProperties()); byte[] body = converter.fromConnectData(topic, CONNECTOR_CONFIGURATION_V0, struct); dataSynchronizer.send(CONNECTOR_KEY(connectName), body); }); @@ -391,8 +453,8 @@ private void triggerSendMessage() { taskConfigs.forEach(taskConfig -> { ConnectorTaskId taskId = new ConnectorTaskId(connectName, taskConfig.getInt(ConnectorConfig.TASK_ID)); Struct struct = new Struct(TASK_CONFIGURATION_V0) - .put(FIELD_EPOCH, System.currentTimeMillis()) - .put(FIELD_PROPS, taskConfig.getProperties()); + .put(FIELD_EPOCH, System.currentTimeMillis()) + .put(FIELD_PROPS, taskConfig.getProperties()); byte[] body = converter.fromConnectData(topic, TASK_CONFIGURATION_V0, struct); dataSynchronizer.send(TASK_KEY(taskId), body); }); @@ -433,6 +495,18 @@ public void onCompletion(Throwable error, String key, byte[] value) { String connectorName = key.substring(DELETE_CONNECTOR_PREFIX.length()); processDeleteConnectorRecord(connectorName, schemaAndValue); + } else if (key.startsWith(RESTART_CONNECTOR_PREFIX)) { + if (key.contains(TASK_PREFIX)) { + Integer lastIndex = key.lastIndexOf("-"); + // restart task + String connectorName = key.substring(RESTART_CONNECTOR_PREFIX.length() + TASK_PREFIX.length(), lastIndex); + String taskNum = key.substring(lastIndex + 1); + processRestartTaskRecord(connectorName, taskNum, schemaAndValue); + } else { + // restart connector + String connectorName = key.substring(RESTART_CONNECTOR_PREFIX.length()); + processRestartConnectorRecord(connectorName, schemaAndValue); + } } else { log.error("Discarding config update record with invalid key: {}", key); } @@ -463,6 +537,47 @@ private void processDeleteConnectorRecord(String connectorName, SchemaAndValue s } } + /** + * process restarte connector + * + * @param connectorName + * @param schemaAndValue + */ + private void processRestartConnectorRecord(String connectorName, SchemaAndValue schemaAndValue) { + processDeleteConnectorRecord(connectorName, schemaAndValue); + processTargetStateRecord(connectorName, schemaAndValue); + } + + /** + * process restart task + * + * @param connectorName + * @param taskNum + * @param schemaAndValue + */ + private void processRestartTaskRecord(String connectorName, String taskNum, SchemaAndValue schemaAndValue) { + if (!connectorKeyValueStore.containsKey(connectorName)) { + return; + } + Struct value = (Struct) schemaAndValue.value(); + Object epoch = value.get(FIELD_EPOCH); + // validate + ConnectKeyValue oldConfig = connectorKeyValueStore.get(connectorName); + Struct struct = (Struct) schemaAndValue.value(); + Object targetState = struct.get(FIELD_STATE); + // config update + if ((Long) epoch > oldConfig.getEpoch()) { + // remove + connectorKeyValueStore.remove(connectorName); + taskKeyValueStore.remove(connectorName); + //start + TargetState state = TargetState.valueOf(targetState.toString()); + oldConfig.setTargetState(state); + // reblance + triggerListener(); + } + } + /** * process task config record * @@ -503,14 +618,14 @@ private void processTargetStateRecord(String connectorName, SchemaAndValue schem if (!(targetState instanceof String)) { // target state log.error("Invalid data for target state for connector '{}': 'state' field should be a String but is {}", - connectorName, className(targetState)); + connectorName, className(targetState)); return; } Object epoch = struct.get(FIELD_EPOCH); if (!(epoch instanceof Long)) { // epoch log.error("Invalid data for epoch for connector '{}': 'epoch' field should be a Long but is {}", - connectorName, className(epoch)); + connectorName, className(epoch)); return; } @@ -538,21 +653,21 @@ private boolean mergeConnectConfig(String connectName, SchemaAndValue schemaAndV if (!(targetState instanceof String)) { // target state log.error("Invalid data for target state for connector '{}': 'state' field should be a String but is {}", - connectName, className(targetState)); + connectName, className(targetState)); return false; } Object epoch = value.get(FIELD_EPOCH); if (!(epoch instanceof Long)) { // epoch log.error("Invalid data for epoch for connector '{}': 'state' field should be a long but is {}", - connectName, className(epoch)); + connectName, className(epoch)); return false; } Object props = value.get(FIELD_PROPS); if (!(props instanceof Map)) { // properties log.error("Invalid data for properties for connector '{}': 'state' field should be a Map but is {}", - connectName, className(props)); + connectName, className(props)); return false; } // new configs @@ -609,4 +724,4 @@ private ConnectorTaskId parseTaskId(String key) { private String className(Object o) { return o != null ? o.getClass().getName() : "null"; } -} \ No newline at end of file +} diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java index 4db142db..67fb9fb7 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java @@ -16,11 +16,13 @@ */ package org.apache.rocketmq.connect.runtime.service.memory; - import io.openmessaging.connector.api.component.connector.Connector; import io.openmessaging.connector.api.component.task.sink.SinkConnector; import io.openmessaging.connector.api.component.task.source.SourceConnector; import io.openmessaging.connector.api.data.RecordConverter; +import io.openmessaging.connector.api.data.Schema; +import io.openmessaging.connector.api.data.SchemaBuilder; +import io.openmessaging.connector.api.data.Struct; import io.openmessaging.connector.api.errors.ConnectException; import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue; import org.apache.rocketmq.connect.runtime.common.LoggerName; @@ -33,6 +35,7 @@ import org.apache.rocketmq.connect.runtime.service.AbstractConfigManagementService; import org.apache.rocketmq.connect.runtime.service.StagingMode; import org.apache.rocketmq.connect.runtime.store.MemoryBasedKeyValueStore; +import org.apache.rocketmq.connect.runtime.utils.datasync.DataSynchronizer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,7 +56,30 @@ public class MemoryConfigManagementServiceImpl extends AbstractConfigManagementS */ private ConnectorConfigUpdateListener connectorConfigUpdateListener; - public MemoryConfigManagementServiceImpl() {} + public static final String RESTART_CONNECTOR_PREFIX = "restart-"; + + public static final String TASK_PREFIX = "task-"; + + private static final String FIELD_EPOCH = "epoch"; + + /** + * Synchronize config with other workers. + */ + private DataSynchronizer dataSynchronizer; + + // converter + public RecordConverter converter; + + public MemoryConfigManagementServiceImpl() { + } + + public static String RESTART_CONNECTOR_KEY(String connectorName) { + return RESTART_CONNECTOR_PREFIX + connectorName; + } + + public static String RESTART_TASK_KEY(String connectorName, Integer task) { + return RESTART_CONNECTOR_PREFIX + TASK_PREFIX + connectorName + "-" + task; + } @Override public void initialize(WorkerConfig workerConfig, RecordConverter converter, Plugin plugin) { @@ -77,6 +103,20 @@ public void stop() { taskKeyValueStore.persist(); } + /** + * restart connector + */ + public static final Schema CONNECTOR_RESTART_CONFIGURATION_V0 = SchemaBuilder.struct() + .field(FIELD_EPOCH, SchemaBuilder.int64().build()) + .build(); + + /** + * restart task + */ + public static final Schema TASK_RESTART_CONFIGURATION_V0 = SchemaBuilder.struct() + .field(FIELD_EPOCH, SchemaBuilder.int64().build()) + .build(); + /** * get all connector configs enabled * @@ -87,7 +127,6 @@ public Map getConnectorConfigs() { return connectorKeyValueStore.getKVMap(); } - @Override public String putConnectorConfig(String connectorName, ConnectKeyValue configs) { /** @@ -134,6 +173,34 @@ public void deleteConnectorConfig(String connectorName) { triggerListener(); } + @Override + public void restartConnector(String connectorName) { + if (!connectorKeyValueStore.containsKey(connectorName)) { + throw new ConnectException("Connector [" + connectorName + "] does not exist"); + } + // new struct + Struct struct = new Struct(CONNECTOR_RESTART_CONFIGURATION_V0); + struct.put(FIELD_EPOCH, System.currentTimeMillis()); + + byte[] config = converter.fromConnectData(topic, CONNECTOR_RESTART_CONFIGURATION_V0, struct); + dataSynchronizer.send(RESTART_CONNECTOR_KEY(connectorName), config); + } + + @Override + public void restartTask(String connectorName, Integer task) { + if (!connectorKeyValueStore.containsKey(connectorName)) { + throw new ConnectException("Connector [" + connectorName + "] does not exist"); + } else if (!taskKeyValueStore.containsKey(connectorName)) { + throw new ConnectException("Task [" + connectorName + "/" + task + "] does not exist"); + } + // new struct + Struct struct = new Struct(TASK_RESTART_CONFIGURATION_V0); + struct.put(FIELD_EPOCH, System.currentTimeMillis()); + + byte[] config = converter.fromConnectData(topic, TASK_RESTART_CONFIGURATION_V0, struct); + dataSynchronizer.send(RESTART_TASK_KEY(connectorName, task), config); + } + /** * pause connector * @@ -167,7 +234,6 @@ public void resumeConnector(String connectorName) { triggerListener(); } - @Override public Map> getTaskConfigs() { return taskKeyValueStore.getKVMap(); diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/ServerResponseMocker.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/ServerResponseMocker.java index 16f3ad40..fa6328fc 100644 --- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/ServerResponseMocker.java +++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/ServerResponseMocker.java @@ -98,7 +98,7 @@ public void shutdown() { } Future future = eventLoopGroup.shutdownGracefully(); try { - future.get(); + Object o = future.get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/distributed/DistributedConnectControllerTest.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/distributed/DistributedConnectControllerTest.java index b02a9684..d4f4cfaa 100644 --- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/distributed/DistributedConnectControllerTest.java +++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/distributed/DistributedConnectControllerTest.java @@ -79,20 +79,21 @@ public void before() throws InterruptedException, MalformedURLException { nameServerMocker = NameServerMocker.startByDefaultConf(9876, 10911); brokerMocker = ServerResponseMocker.startServer(10911, "Hello World".getBytes(StandardCharsets.UTF_8)); workerConfig.setNamesrvAddr("127.0.0.1:9876"); + workerConfig.setOperationTimeout(20000); recordConverter = new JsonConverter(); clusterManagementService.initialize(workerConfig); stateManagementService.initialize(workerConfig, recordConverter); URL url = new URL("file://src/test/java/org/apache/rocketmq/connect/runtime"); - URL[] urls = new URL[]{}; + URL[] urls = new URL[] {}; pluginClassLoader = new PluginClassLoader(url, urls); Thread.currentThread().setContextClassLoader(pluginClassLoader); distributedConnectController = new DistributedConnectController( - plugin, - distributedConfig, - clusterManagementService, - configManagementService, - positionManagementService, - stateManagementService ); + plugin, + distributedConfig, + clusterManagementService, + configManagementService, + positionManagementService, + stateManagementService); } @After diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/distributed/TestConfigManagementService.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/distributed/TestConfigManagementService.java index 2a1f81ea..a10e8dff 100644 --- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/distributed/TestConfigManagementService.java +++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/distributed/TestConfigManagementService.java @@ -58,6 +58,16 @@ public void deleteConnectorConfig(String connectorName) { } + @Override + public void restartConnector(String connectorName) { + + } + + @Override + public void restartTask(String connectorName, Integer task) { + + } + @Override public void pauseConnector(String connectorName) { }