From 73f420e38e622af0f0e88dc3be10d1da1dc7a341 Mon Sep 17 00:00:00 2001 From: songxiaosheng Date: Mon, 4 Sep 2023 08:14:08 +0800 Subject: [PATCH] remove offline server ip (#2251) * remove offline server ip * :memo: doc remove offline server * :memo: doc remove offline server --- .../lite/internal/server/ServerService.java | 63 +++++++++++++++---- .../lite/internal/setup/SetUpFacade.java | 1 + 2 files changed, 52 insertions(+), 12 deletions(-) diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/server/ServerService.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/server/ServerService.java index b2bb8a223e..011e4c17c8 100644 --- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/server/ServerService.java +++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/server/ServerService.java @@ -18,6 +18,7 @@ package org.apache.shardingsphere.elasticjob.lite.internal.server; import com.google.common.base.Strings; +import org.apache.commons.lang.StringUtils; import org.apache.shardingsphere.elasticjob.lite.internal.instance.InstanceNode; import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobRegistry; import org.apache.shardingsphere.elasticjob.lite.internal.storage.JobNodeStorage; @@ -25,27 +26,30 @@ import org.apache.shardingsphere.elasticjob.infra.concurrent.BlockUtils; import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; /** * Server service. */ public final class ServerService { - + private final String jobName; - + private final JobNodeStorage jobNodeStorage; - + private final ServerNode serverNode; - + public ServerService(final CoordinatorRegistryCenter regCenter, final String jobName) { this.jobName = jobName; jobNodeStorage = new JobNodeStorage(regCenter, jobName); serverNode = new ServerNode(jobName); } - + /** * Persist online status of job server. - * + * * @param enabled enable server or not */ public void persistOnline(final boolean enabled) { @@ -53,10 +57,10 @@ public void persistOnline(final boolean enabled) { jobNodeStorage.fillJobNode(serverNode.getServerNode(JobRegistry.getInstance().getJobInstance(jobName).getServerIp()), enabled ? ServerStatus.ENABLED.name() : ServerStatus.DISABLED.name()); } } - + /** * Judge has available servers or not. - * + * * @return has available servers or not */ public boolean hasAvailableServers() { @@ -68,17 +72,17 @@ public boolean hasAvailableServers() { } return false; } - + /** * Judge is available server or not. - * + * * @param ip job server IP address * @return is available server or not */ public boolean isAvailableServer(final String ip) { return isEnableServer(ip) && hasOnlineInstances(ip); } - + private boolean hasOnlineInstances(final String ip) { for (String each : jobNodeStorage.getJobNodeChildrenKeys(InstanceNode.ROOT)) { if (each.startsWith(ip)) { @@ -87,7 +91,7 @@ private boolean hasOnlineInstances(final String ip) { } return false; } - + /** * Judge is server enabled or not. * @@ -102,4 +106,39 @@ public boolean isEnableServer(final String ip) { } return ServerStatus.ENABLED.name().equals(serverStatus); } + + /** + * Remove unuse serverIp. + * @return num of serverIp be remove + */ + public int removeOfflineServers() { + AtomicInteger affectNums = new AtomicInteger(); + List instances = jobNodeStorage.getJobNodeChildrenKeys(InstanceNode.ROOT); + if (instances == null || instances.isEmpty()) { + return affectNums.get(); + } + Set instanceIps = instances.stream() + .map(instance -> instance.split("@-@")[0]) + .collect(Collectors.toSet()); + if (instanceIps == null || instanceIps.isEmpty()) { + return affectNums.get(); + } + List serverIps = jobNodeStorage.getJobNodeChildrenKeys(ServerNode.ROOT); + if (serverIps == null || serverIps.isEmpty()) { + return affectNums.get(); + } + + serverIps.forEach(serverIp -> { + if (instanceIps.contains(serverIp)) { + return; + } + String status = jobNodeStorage.getJobNodeData(serverNode.getServerNode(serverIp)); + if (StringUtils.isNotBlank(status)) { + return; + } + jobNodeStorage.removeJobNodeIfExisted(serverNode.getServerNode(serverIp)); + affectNums.getAndIncrement(); + }); + return affectNums.get(); + } } diff --git a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/setup/SetUpFacade.java b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/setup/SetUpFacade.java index f0d3fa83b4..da154ec1e0 100644 --- a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/setup/SetUpFacade.java +++ b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/setup/SetUpFacade.java @@ -75,6 +75,7 @@ public void registerStartUpInfo(final boolean enabled) { if (!reconcileService.isRunning()) { reconcileService.startAsync(); } + serverService.removeOfflineServers(); } /**