Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #383]support failed task after connector restart #386

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,25 @@ public void deleteConnectorConfig(String connectorName) {
configManagementService.deleteConnectorConfig(connectorName);
}

/**
* Restart the connector with the specified connector name in the cluster.
*
* @param connectorName
*/
public void restartConnectorConfig(String connectorName) {
sunheyi6 marked this conversation as resolved.
Show resolved Hide resolved
configManagementService.restartConnectorConfig(connectorName);
}

/**
* Restart the task with the specified task name in the cluster.
*
* @param connectorName
* @param task
*/
public void restartTaskConfig(String connectorName, Integer task) {
sunheyi6 marked this conversation as resolved.
Show resolved Hide resolved
configManagementService.restartTaskConfig(connectorName, task);
}

/**
* Pause the connector. This call will asynchronously suspend processing by the connector and all
* of its tasks.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ public RestHandler(AbstractConnectController connectController) {
app.get("/connectors/{connectorName}/stop", this::handleStopConnector);
app.get("/connectors/stop/all", this::handleStopAllConnector);

// restart connector
app.get("/connectors/{connectorName}/restart", this::handleRestartConnector);
app.get("/connectors/{connectorName}/tasks/{task}/restart", this::handleRestartTask);

// pause & resume
app.get("/connectors/{connectorName}/pause", this::handlePauseConnector);
app.get("/connectors/{connectorName}/resume", this::handleResumeConnector);
Expand Down Expand Up @@ -236,6 +240,28 @@ private void handleStopAllConnector(Context context) {
}
}

private void handleRestartConnector(Context context) {
try {
String connectorName = context.pathParam(CONNECTOR_NAME);
connectController.restartConnectorConfig(connectorName);
context.json(new HttpResponse<>(context.status(), "Connector [" + connectorName + "] restarted successfully"));
} catch (Exception e) {
log.error("Restart connector failed .", e);
context.json(new ErrorMessage(HttpStatus.INTERNAL_SERVER_ERROR_500, e.getMessage()));
}
}

public void handleRestartTask(Context context) {
try {
String connectorName = context.pathParam(CONNECTOR_NAME);
Integer task = Integer.valueOf(context.pathParam(TASK_NAME));
connectController.restartTaskConfig(connectorName, task);
context.json(new HttpResponse<>(context.status(), "Task [" + connectorName + "/ " + task + "] restarted successfully"));
} catch (Exception ex) {
log.error("Restart task failed .", ex);
context.json(new ErrorMessage(HttpStatus.INTERNAL_SERVER_ERROR_500, ex.getMessage()));
}
}

private void handlePauseConnector(Context context) {
String connectorName = context.pathParam(CONNECTOR_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,21 @@ default void configure(WorkerConfig config) {
*/
void deleteConnectorConfig(String connectorName);

/**
* restart the connector with the specified connector name in the cluster.
*
* @param connectorName
*/
void restartConnectorConfig(String connectorName);
sunheyi6 marked this conversation as resolved.
Show resolved Hide resolved

/**
* restart the task with the specified task in the cluster.
*
* @param connectorName
* @param task
*/
void restartTaskConfig(String connectorName, Integer task);
sunheyi6 marked this conversation as resolved.
Show resolved Hide resolved

/**
* pause connector
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,20 @@ public static String TASK_KEY(ConnectorTaskId taskId) {
}

public static final String DELETE_CONNECTOR_PREFIX = "delete-";
public static final String RESTART_CONNECTOR_KEY = "restart-connector-";

public static String DELETE_CONNECTOR_KEY(String connectorName) {
return DELETE_CONNECTOR_PREFIX + connectorName;
}

public static String RESTART_CONNECTOR_KEY(String connectorName) {
return RESTART_CONNECTOR_KEY + connectorName;
}

public static String RESTART_TASK_KEY(String connectorName, Integer task) {
return RESTART_CONNECTOR_KEY + connectorName + "-" + task;
}

private static final String FIELD_STATE = "state";
private static final String FIELD_EPOCH = "epoch";
private static final String FIELD_PROPS = "properties";
Expand Down Expand Up @@ -121,6 +130,20 @@ public static String DELETE_CONNECTOR_KEY(String connectorName) {
.field(FIELD_EPOCH, SchemaBuilder.int64().build())
.build();

/**
* restart connector
*/
public static final Schema CONNECTOR_RESTART_CONFIGURATION_V0 = SchemaBuilder.struct()
.field(FIELD_EPOCH, SchemaBuilder.int64().build())
.build();

/**
* restart task
*/
public static final Schema TASK_RESTART_CONFIGURATION_V0 = SchemaBuilder.struct()
.field(FIELD_EPOCH, SchemaBuilder.int64().build())
.build();

/**
* task configuration
*/
Expand Down Expand Up @@ -290,6 +313,46 @@ public void deleteConnectorConfig(String connectorName) {
dataSynchronizer.send(DELETE_CONNECTOR_KEY(connectorName), config);
}

/**
* restart connector config
*
* @param connectorName
*/
@Override
public void restartConnectorConfig(String connectorName) {
if (!connectorKeyValueStore.containsKey(connectorName)) {
throw new ConnectException("Connector [" + connectorName + "] does not exist");
}
// new struct
Struct struct = new Struct(CONNECTOR_RESTART_CONFIGURATION_V0);
struct.put(FIELD_EPOCH, System.currentTimeMillis());

byte[] config = converter.fromConnectData(topic, CONNECTOR_RESTART_CONFIGURATION_V0, struct);
dataSynchronizer.send(RESTART_CONNECTOR_KEY(connectorName), config);
}

/**
* restart task config
*
* @param connectorName
* @param task
*/
@Override
public void restartTaskConfig(String connectorName, Integer task) {
if (!connectorKeyValueStore.containsKey(connectorName)) {
throw new ConnectException("Connector [" + connectorName + "] does not exist");
}
else if (!taskKeyValueStore.containsKey(connectorName)) {
throw new ConnectException("Task [" + connectorName + "/" + task + "] does not exist");
}
// new struct
Struct struct = new Struct(TASK_RESTART_CONFIGURATION_V0);
struct.put(FIELD_EPOCH, System.currentTimeMillis());

byte[] config = converter.fromConnectData(topic, TASK_RESTART_CONFIGURATION_V0, struct);
dataSynchronizer.send(RESTART_TASK_KEY(connectorName, task), config);
}

/**
* pause connector
*
Expand Down Expand Up @@ -609,4 +672,4 @@ private ConnectorTaskId parseTaskId(String key) {
private String className(Object o) {
return o != null ? o.getClass().getName() : "null";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,38 @@ public void deleteConnectorConfig(String connectorName) {
triggerListener();
}

@Override
public void restartConnectorConfig(String connectorName) {
if (!connectorKeyValueStore.containsKey(connectorName)) {
throw new ConnectException("Connector [" + connectorName + "] does not exist");
}

stop();
ConnectKeyValue config = connectorKeyValueStore.get(connectorName);
config.setEpoch(System.currentTimeMillis());
config.setTargetState(TargetState.STARTED);
connectorKeyValueStore.put(connectorName, config.nextGeneration());
triggerListener();
}

@Override
public void restartTaskConfig(String connectorName, Integer task) {
if (!connectorKeyValueStore.containsKey(connectorName)) {
throw new ConnectException("Connector [" + connectorName + "] does not exist");
}
else if (!taskKeyValueStore.containsKey(connectorName))
{
throw new ConnectException("Task [" + connectorName + "/" + task + "] does not exist");
}

stop();
ConnectKeyValue config = connectorKeyValueStore.get(connectorName);
config.setEpoch(System.currentTimeMillis());
config.setTargetState(TargetState.STARTED);
connectorKeyValueStore.put(connectorName, config.nextGeneration());
triggerListener();
}

/**
* pause connector
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void shutdown() {
}
Future<?> future = eventLoopGroup.shutdownGracefully();
try {
future.get();
Object o = future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,16 @@ public void deleteConnectorConfig(String connectorName) {

}

@Override
public void restartConnectorConfig(String connectorName) {

}

@Override
public void restartTaskConfig(String connectorName, Integer task) {

}

@Override
public void pauseConnector(String connectorName) {
}
Expand Down