From 434380dcf6492f9685da456940604e382d4cc191 Mon Sep 17 00:00:00 2001 From: yechun Date: Wed, 1 Mar 2023 13:13:19 +0800 Subject: [PATCH] [ISSUE #432] add redress running connectors status --- .../connect/runtime/connectorwrapper/Worker.java | 16 +++++++++++++++- .../connectorwrapper/WorkerConnector.java | 10 +++++++++- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java index f6ff52cf7..9c0618abb 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java @@ -80,6 +80,7 @@ import static org.apache.rocketmq.connect.runtime.connectorwrapper.status.AbstractStatus.State.PAUSED; import static org.apache.rocketmq.connect.runtime.connectorwrapper.status.AbstractStatus.State.RUNNING; +import static org.apache.rocketmq.connect.runtime.connectorwrapper.status.AbstractStatus.State.UNASSIGNED; /** * A worker to schedule all connectors and tasks in a process. @@ -564,7 +565,8 @@ public Set getCleanedStoppedTasks() { } public void maintainConnectorState() { - + // STEP 1: redress running connectors status + redressRunningConnectors(); } /** @@ -935,6 +937,18 @@ private void redressRunningStatus(WorkerTask workerTask) { } } + private void redressRunningConnectors() { + for (WorkerConnector connector : connectors.values()) { + ConnectorStatus connectorStatus = stateManagementService.get(connector.getConnectorName()); + if (connectorStatus != null && connectorStatus.getState() == UNASSIGNED && connector.getKeyValue().getTargetState() == TargetState.STARTED && + connector.getState() == WorkerConnector.State.STARTED) { + ConnectorStatus redressStatus = new ConnectorStatus(connector.getConnectorName(), RUNNING, workerConfig.getWorkerId(), System.currentTimeMillis()); + log.warn("Connector {}, Old connector status is {}, new connector status {}", connector.getConnectorName(), connectorStatus, redressStatus); + stateManagementService.put(redressStatus); + } + } + } + private Map> newTasks(Map> taskConfigs) { Map> newTasks = new HashMap<>(); for (String connectorName : taskConfigs.keySet()) { diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerConnector.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerConnector.java index 0c146555a..b266cdbdf 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerConnector.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerConnector.java @@ -369,10 +369,18 @@ public String toString() { return sb; } - private enum State { + public enum State { INIT, STOPPED, STARTED, FAILED, } + + public State getState() { + return state; + } + + public void setState(State state) { + this.state = state; + } }