diff --git a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/errorcode/LinkisRpcErrorCodeSummary.java b/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/errorcode/LinkisRpcErrorCodeSummary.java index 3723ca145d..b87e730994 100644 --- a/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/errorcode/LinkisRpcErrorCodeSummary.java +++ b/linkis-commons/linkis-rpc/src/main/java/org/apache/linkis/rpc/errorcode/LinkisRpcErrorCodeSummary.java @@ -30,6 +30,8 @@ public enum LinkisRpcErrorCodeSummary implements LinkisErrorCode { 10004, "The corresponding anti-sequence class:{0} failed to initialize(对应的反序列类:{0} 初始化失败)"), APPLICATION_IS_NOT_EXISTS( 10051, "The instance:{0} of application {1} does not exist(应用程序:{0} 的实例:{1} 不存在)."), + + INSTANCE_ERROR(10052, "The instance:{0} is error should ip:port."), RPC_INIT_ERROR(10054, "Asyn RPC Consumer Thread has stopped!(Asyn RPC Consumer 线程已停止!)"); /** 错误码 */ diff --git a/linkis-spring-cloud-services/linkis-service-gateway/linkis-spring-cloud-gateway/src/main/java/org/apache/linkis/gateway/springcloud/loadbalancer/ServiceInstancePriorityLoadBalancer.java b/linkis-spring-cloud-services/linkis-service-gateway/linkis-spring-cloud-gateway/src/main/java/org/apache/linkis/gateway/springcloud/loadbalancer/ServiceInstancePriorityLoadBalancer.java index a195478b8a..673559789c 100644 --- a/linkis-spring-cloud-services/linkis-service-gateway/linkis-spring-cloud-gateway/src/main/java/org/apache/linkis/gateway/springcloud/loadbalancer/ServiceInstancePriorityLoadBalancer.java +++ b/linkis-spring-cloud-services/linkis-service-gateway/linkis-spring-cloud-gateway/src/main/java/org/apache/linkis/gateway/springcloud/loadbalancer/ServiceInstancePriorityLoadBalancer.java @@ -18,6 +18,8 @@ package org.apache.linkis.gateway.springcloud.loadbalancer; import org.apache.linkis.gateway.springcloud.constant.GatewayConstant; +import org.apache.linkis.rpc.errorcode.LinkisRpcErrorCodeSummary; +import org.apache.linkis.rpc.exception.NoInstanceExistsException; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -32,6 +34,7 @@ import org.springframework.cloud.loadbalancer.core.SelectedInstanceCallback; import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier; +import java.text.MessageFormat; import java.util.List; import java.util.Objects; import java.util.Random; @@ -94,25 +97,37 @@ private Response processInstanceResponse( private Response getInstanceResponse( List instances, String clientIp) { if (instances.isEmpty()) { - if (log.isWarnEnabled()) { - log.warn("No servers available for service: " + serviceId); - } + log.warn("No servers available for service: " + serviceId); return new EmptyResponse(); } int pos = this.position.incrementAndGet() & Integer.MAX_VALUE; - if (StringUtils.isEmpty(clientIp)) { + if (StringUtils.isBlank(clientIp)) { return new DefaultResponse(instances.get(pos % instances.size())); } + String[] ipAndPort = clientIp.split(":"); + if (ipAndPort.length != 2) { + throw new NoInstanceExistsException( + LinkisRpcErrorCodeSummary.INSTANCE_ERROR.getErrorCode(), + MessageFormat.format(LinkisRpcErrorCodeSummary.INSTANCE_ERROR.getErrorDesc(), clientIp)); + } + ServiceInstance chooseInstance = null; for (ServiceInstance instance : instances) { - String[] ipAndPort = clientIp.split(":"); - if (ipAndPort.length == 2 - && Objects.equals(ipAndPort[0], instance.getHost()) + if (Objects.equals(ipAndPort[0], instance.getHost()) && Objects.equals(ipAndPort[1], instance.getPort())) { - return new DefaultResponse(instance); + chooseInstance = instance; + break; } } - - return new DefaultResponse(instances.get(pos % instances.size())); + if (null == chooseInstance) { + throw new NoInstanceExistsException( + LinkisRpcErrorCodeSummary.APPLICATION_IS_NOT_EXISTS.getErrorCode(), + MessageFormat.format( + LinkisRpcErrorCodeSummary.APPLICATION_IS_NOT_EXISTS.getErrorDesc(), + clientIp, + serviceId)); + } else { + return new DefaultResponse(chooseInstance); + } } }