Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #383]support failed task after connector restart #386

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,25 @@ public void deleteConnectorConfig(String connectorName) {
configManagementService.deleteConnectorConfig(connectorName);
}

/**
* Restart the connector with the specified connector name in the cluster.
*
* @param connectorName
*/
public void restartConnector(String connectorName) {
configManagementService.restartConnector(connectorName);
}

/**
* Restart the task with the specified task name in the cluster.
*
* @param connectorName
* @param task
*/
public void restartTask(String connectorName, Integer task) {
configManagementService.restartTask(connectorName, task);
}

/**
* Pause the connector. This call will asynchronously suspend processing by the connector and all
* of its tasks.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ public RestHandler(AbstractConnectController connectController) {
app.get("/connectors/{connectorName}/stop", this::handleStopConnector);
app.get("/connectors/stop/all", this::handleStopAllConnector);

// restart connector
app.get("/connectors/{connectorName}/restart", this::handleRestartConnector);
app.get("/connectors/{connectorName}/tasks/{task}/restart", this::handleRestartTask);

// pause & resume
app.get("/connectors/{connectorName}/pause", this::handlePauseConnector);
app.get("/connectors/{connectorName}/resume", this::handleResumeConnector);
Expand Down Expand Up @@ -236,6 +240,28 @@ private void handleStopAllConnector(Context context) {
}
}

private void handleRestartConnector(Context context) {
try {
String connectorName = context.pathParam(CONNECTOR_NAME);
connectController.restartConnector(connectorName);
context.json(new HttpResponse<>(context.status(), "Connector [" + connectorName + "] restarted successfully"));
} catch (Exception e) {
log.error("Restart connector failed .", e);
context.json(new ErrorMessage(HttpStatus.INTERNAL_SERVER_ERROR_500, e.getMessage()));
}
}

public void handleRestartTask(Context context) {
try {
String connectorName = context.pathParam(CONNECTOR_NAME);
Integer task = Integer.valueOf(context.pathParam(TASK_NAME));
connectController.restartTask(connectorName, task);
context.json(new HttpResponse<>(context.status(), "Task [" + connectorName + "/ " + task + "] restarted successfully"));
} catch (Exception ex) {
log.error("Restart task failed .", ex);
context.json(new ErrorMessage(HttpStatus.INTERNAL_SERVER_ERROR_500, ex.getMessage()));
}
}

private void handlePauseConnector(Context context) {
String connectorName = context.pathParam(CONNECTOR_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,21 @@ default void configure(WorkerConfig config) {
*/
void deleteConnectorConfig(String connectorName);

/**
* restart the connector with the specified connector name in the cluster.
*
* @param connectorName
*/
void restartConnector(String connectorName);

/**
* restart the task with the specified task in the cluster.
*
* @param connectorName
* @param task
*/
void restartTask(String connectorName, Integer task);

/**
* pause connector
*
Expand Down
Loading