Skip to content

Commit

Permalink
[ISSUE #432] add redress running connectors status
Browse files Browse the repository at this point in the history
  • Loading branch information
Slideee committed Apr 7, 2023
1 parent eb81225 commit 434380d
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@

import static org.apache.rocketmq.connect.runtime.connectorwrapper.status.AbstractStatus.State.PAUSED;
import static org.apache.rocketmq.connect.runtime.connectorwrapper.status.AbstractStatus.State.RUNNING;
import static org.apache.rocketmq.connect.runtime.connectorwrapper.status.AbstractStatus.State.UNASSIGNED;

/**
* A worker to schedule all connectors and tasks in a process.
Expand Down Expand Up @@ -564,7 +565,8 @@ public Set<Runnable> getCleanedStoppedTasks() {
}

public void maintainConnectorState() {

// STEP 1: redress running connectors status
redressRunningConnectors();
}

/**
Expand Down Expand Up @@ -935,6 +937,18 @@ private void redressRunningStatus(WorkerTask workerTask) {
}
}

private void redressRunningConnectors() {
for (WorkerConnector connector : connectors.values()) {
ConnectorStatus connectorStatus = stateManagementService.get(connector.getConnectorName());
if (connectorStatus != null && connectorStatus.getState() == UNASSIGNED && connector.getKeyValue().getTargetState() == TargetState.STARTED &&
connector.getState() == WorkerConnector.State.STARTED) {
ConnectorStatus redressStatus = new ConnectorStatus(connector.getConnectorName(), RUNNING, workerConfig.getWorkerId(), System.currentTimeMillis());
log.warn("Connector {}, Old connector status is {}, new connector status {}", connector.getConnectorName(), connectorStatus, redressStatus);
stateManagementService.put(redressStatus);
}
}
}

private Map<String, List<ConnectKeyValue>> newTasks(Map<String, List<ConnectKeyValue>> taskConfigs) {
Map<String, List<ConnectKeyValue>> newTasks = new HashMap<>();
for (String connectorName : taskConfigs.keySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,10 +369,18 @@ public String toString() {
return sb;
}

private enum State {
public enum State {
INIT,
STOPPED,
STARTED,
FAILED,
}

public State getState() {
return state;
}

public void setState(State state) {
this.state = state;
}
}

0 comments on commit 434380d

Please sign in to comment.